# Demo PySpark

Examples https://spark.apache.org/examples.html

In [1]:
import random
from collections import Counter
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

In [2]:
sc = SparkContext('local', 'test')
sc.setLogLevel('ERROR')

24/11/27 11:06:48 WARN Utils: Your hostname, Peters-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 10.26.147.174 instead (on interface en0)
24/11/27 11:06:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/27 11:06:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/27 11:06:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark = SparkSession.builder.appName("demo").getOrCreate()

In [4]:
type(spark)

pyspark.sql.session.SparkSession

In [5]:
df = spark.createDataFrame(
    [
        ("sue", 32),
        ("li", 3),
        ("bob", 75),
        ("heo", 13),
    ],
    ["first_name", "age"],
)

In [6]:
df.show()

                                                                                

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+



In [7]:
from pyspark.sql.functions import col, when

df1 = df.withColumn(
    "life_stage",
    when(col("age") < 13, "child")
    .when(col("age").between(13, 19), "teenager")
    .otherwise("adult"),
)

In [8]:
df1.show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|        li|  3|     child|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+



In [9]:
df1.where(col("life_stage").isin(["teenager", "adult"])).show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+



In [10]:
from pyspark.sql.functions import avg

df1.select(avg("age")).show()

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+



In [11]:
df1.groupBy("life_stage").avg().show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|  teenager|    13.0|
|     child|     3.0|
+----------+--------+



### https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html

In [12]:
df = spark.createDataFrame(
    [(14, "Tom", "CA"), 
     (23, "Alice", "NY"), 
     (16, "Bob", "TX")],
    ["age", "name", "state"]
)
df.columns

['age', 'name', 'state']

In [13]:
df.show()

+---+-----+-----+
|age| name|state|
+---+-----+-----+
| 14|  Tom|   CA|
| 23|Alice|   NY|
| 16|  Bob|   TX|
+---+-----+-----+



In [14]:
selected_cols = [col for col in df.columns if col != "age"]
df.select(selected_cols).show()

+-----+-----+
| name|state|
+-----+-----+
|  Tom|   CA|
|Alice|   NY|
|  Bob|   TX|
+-----+-----+



In [15]:
from pyspark.sql import Row
from pyspark.sql.functions import desc
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")]).toDF("age", "name")
df2 = spark.createDataFrame([Row(height=80, name="Tom"), Row(height=85, name="Bob")])
df3 = spark.createDataFrame([Row(age=2, name="Alice"), Row(age=5, name="Bob")])
df4 = spark.createDataFrame([
    Row(age=10, height=80, name="Alice"),
    Row(age=5, height=None, name="Bob"),
    Row(age=None, height=None, name="Tom"),
    Row(age=None, height=None, name=None),
])

In [16]:
df3.show()

+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+



In [17]:
df.join(df2, 'name').select(df.name, df.age, df2.height).show()

+----+---+------+
|name|age|height|
+----+---+------+
| Bob|  5|    85|
+----+---+------+



In [18]:
df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).sort(desc("name")).show()

+-----+------+
| name|height|
+-----+------+
|  Bob|    85|
|Alice|  NULL|
| NULL|    80|
+-----+------+



In [19]:
df.join( df3,
        [df.name == df3.name, df.age == df3.age],
        'outer'
).select(df.name, df3.age).show()

+-----+---+
| name|age|
+-----+---+
|Alice|  2|
|  Bob|  5|
+-----+---+



In [20]:
from pyspark.sql import DataFrameReader

In [21]:
filename = 'ca-500.csv'
df = spark.read.csv(filename, sep=';', header=True)
df.select(['first_name', 'last_name', 'city']).show()

+----------+-------------+-------------+
|first_name|    last_name|         city|
+----------+-------------+-------------+
| Francoise|Rautenstrauch|      Windsor|
|    Kendra|         Loud|       Alcida|
|   Lourdes|     Bauswell|   Belleville|
|    Hannah|      Edmison|    Vancouver|
|       Tom|        Loeza|  LIle-Perrot|
|   Queenie|   Kramarczyk|Swift Current|
|       Hui|      Portaro|  Baker Brook|
|    Josefa|        Opitz|        Delhi|
|       Lea|    Steinhaus|     Bradford|
|     Paola|       Vielma|       Aurora|
| Hortencia|      Bresser|New Waterford|
|    Leanna|     Tijerina|   North York|
|    Danilo|        Pride|     Red Deer|
|      Huey|     Marcille|     Edmonton|
|  Apolonia|        Warne|  Fredericton|
|   Chandra|        Lagos|    Etobicoke|
|    Crissy|     Pacholec|       Barrie|
|    Gianna|       Branin|      Calgary|
|  Valentin|        Billa|      Pangman|
|     Ilona|       Dudash|Rouyn-Noranda|
+----------+-------------+-------------+
only showing top

## Count words

In [7]:
filename = 'basiswoorden-gekeurd.txt'
df = pd.read_csv(filename, names=['words'])

In [8]:
words = list(df['words'].values)

In [9]:
len(words)

199403

In [12]:
random.sample(words, k=10)

['cacaohaven',
 'afstoppen',
 'maniak',
 'afmeten',
 'vreemdelingendetentie',
 'spuitomruil',
 'herpositioneren',
 'dossiermanager',
 'ongeborene',
 'terrasscheiding']

In [13]:
sample_words = random.sample(words, k=50000)
selected_words = random.choices(sample_words, k=100000)

In [14]:
len(selected_words)

100000

In [15]:
selected_words[:10]

['stroomversnelling',
 'weeroverzicht',
 'kol nidrei',
 'comedyreeks',
 'wegsnijden',
 'formatieoverleg',
 'kostenheffing',
 'dressman',
 'onintuïtief',
 'grootspraak']

### Python dict using count method

In [30]:
%%time
counts = {}
for word in set(selected_words):
    counts[word] = selected_words.count(word)

CPU times: user 40.9 s, sys: 93.2 ms, total: 41 s
Wall time: 41 s


In [255]:
len(counts)

43199

In [256]:
list(counts.items())[:10]

[('meson', 1),
 ('Statenvergadering', 2),
 ('volksgezondheidsbeleid', 1),
 ('openluchtmissen', 3),
 ('kweekconditie', 4),
 ('ticketshop', 2),
 ('personeelsaankoop', 1),
 ('dakplaat', 2),
 ('bambino', 1),
 ('zondagochtendnieuwsbrief', 5)]

### Python dict using running totals

In [16]:
%%time
counts = {}
for word in selected_words:
    counts[word] = counts.get(word, 0) + 1

CPU times: user 32.5 ms, sys: 3.15 ms, total: 35.6 ms
Wall time: 34.3 ms


In [17]:
type(counts)

dict

In [18]:
len(counts)

43153

In [19]:
list(counts.items())[:10]

[('stroomversnelling', 2),
 ('weeroverzicht', 2),
 ('kol nidrei', 4),
 ('comedyreeks', 4),
 ('wegsnijden', 6),
 ('formatieoverleg', 1),
 ('kostenheffing', 1),
 ('dressman', 5),
 ('onintuïtief', 2),
 ('grootspraak', 2)]

### Python using collections.Counter

In [20]:
%%time
counts = Counter(selected_words)

CPU times: user 23 ms, sys: 3.84 ms, total: 26.8 ms
Wall time: 37.8 ms


In [36]:
type(counts)

collections.Counter

In [37]:
len(counts)

43239

In [38]:
list(counts.items())[:10]

[('vulpasta', 3),
 ('rijgschoen', 8),
 ('aalkuip', 2),
 ('introductiekoers', 3),
 ('behandeladvies', 1),
 ('weekoverzicht', 3),
 ('mee-eten', 2),
 ('vroeggeboren', 2),
 ('verkoeling', 4),
 ('terpostbezorging', 5)]

### Pandas using value_counts

In [21]:
df_selected_words = pd.DataFrame({'words': selected_words})

In [22]:
%%time
counts = df_selected_words['words'].value_counts(sort=False)

CPU times: user 30.5 ms, sys: 1.39 ms, total: 31.9 ms
Wall time: 32 ms


In [23]:
type(counts)

pandas.core.series.Series

In [24]:
len(counts)

43153

In [26]:
counts.head(10)

words
stroomversnelling    2
weeroverzicht        2
kol nidrei           4
comedyreeks          4
wegsnijden           6
formatieoverleg      1
kostenheffing        1
dressman             5
onintuïtief          2
grootspraak          2
Name: count, dtype: int64

### PySpark

In [27]:
df_spark = spark.createDataFrame(df_selected_words)

In [28]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [29]:
%%time
counts = df_spark.groupBy('words').count()

CPU times: user 2.04 ms, sys: 3.11 ms, total: 5.15 ms
Wall time: 54 ms


In [47]:
type(counts)

pyspark.sql.dataframe.DataFrame

In [48]:
counts.count()

43239

In [50]:
counts.limit(10).show()

+------------------+-----+
|             words|count|
+------------------+-----+
|kredietkaartnummer|    4|
|         koopavond|    2|
|         weckketel|    2|
|          balklaag|    1|
|         inkerving|    3|
|        in gebreke|    1|
|        NAVO-zijde|    1|
|         bleekneus|    1|
|   expeditieknecht|    2|
|   landbouwfunctie|    3|
+------------------+-----+

