# Pyspark Basic Functions

Download Spark and PySpark if not already installed

In [None]:
##uncomment code to install

#!pip install spark
#!pip install pyspark

In [1]:
import spark
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.types as t
from pyspark import SQLContext
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


In [2]:
#suppress warnings

import warnings
warnings.filterwarnings("ignore")


In [3]:
spark_context = SparkSession.builder.getOrCreate()
sqlcontext = SQLContext(spark_context)

#remove warnings ????????????????????????

### Create Sample Dataframes

In [4]:
#create a user dataframe

# Setup the Schema
schema = t.StructType([
    t.StructField("id", t.StringType(),True),
    t.StructField("town", t.StringType(),True),
    t.StructField("salary", t.IntegerType(),True),
    t.StructField("texting", t.StringType(),True),
    t.StructField("start_week", t.StringType(),True)

])

# Add Data
data = (
    [
        ('190', 'Leiden', 2166, 'I like my ring','2019_01'),
        ('99', 'Amersfoort', 9211, 'Why are you glaring at me?','2019_05'),
        ('329', 'Leiden', 8794, 'Singing in the rain','2019_08'),
        ('294', 'Woerden', 5813, 'Christmas was fairylike','2019_02'),
        ('199', 'Amersfoort', 2365, 'A ring for my wife','2019_10'),
        ('904', 'Delft', 2921, 'I need some fresh air','2019_32'),
        ('242','Amersfoort', 2635, 'I am browsing the internet','2019_25'),
        ('367','Leiden', 1947, 'The airfares are very expensive','2019_44'),
        ('357','Dordrecht', 4899, 'The bell is ringing','2019_28'),
        ('278','Woerden', 4495, 'The air is very poluted','2019_04'),
        ('102','Leiden', 4907, 'I would love to sing','2019_18'),
        ('490','Woerden', 1778, 'I need a new hairbrush','2019_37'),
        ('711','Amersfoort', 5397, 'Why am I singled out?','2019_21'),
        ('796','Delft', 6874, 'Love is in the air','2019_15'),
        ('39','Leiden', 5628, 'This food is mouthwatering','2019_07'),
        ('953','Delft', 1395, 'You are singing','2019_11'),
        ('55','Leiden', 9403, 'His wife has an affair','2019_35'),
        ('70','Dordrecht', 5500, "I love to sing in the rain",'2019_14'),
        ('33','Amersfoort', 4500, "It is great to breath some fresh air",'2019_23'),
        ('100','Woerden', 4406, 'Why are you refusing?','2019_09')
    ]
)

# Setup the Data Frame
df1 = sqlcontext.createDataFrame(data,schema=schema)
df1.show(truncate=False)

+---+----------+------+------------------------------------+----------+
|id |town      |salary|texting                             |start_week|
+---+----------+------+------------------------------------+----------+
|190|Leiden    |2166  |I like my ring                      |2019_01   |
|99 |Amersfoort|9211  |Why are you glaring at me?          |2019_05   |
|329|Leiden    |8794  |Singing in the rain                 |2019_08   |
|294|Woerden   |5813  |Christmas was fairylike             |2019_02   |
|199|Amersfoort|2365  |A ring for my wife                  |2019_10   |
|904|Delft     |2921  |I need some fresh air               |2019_32   |
|242|Amersfoort|2635  |I am browsing the internet          |2019_25   |
|367|Leiden    |1947  |The airfares are very expensive     |2019_44   |
|357|Dordrecht |4899  |The bell is ringing                 |2019_28   |
|278|Woerden   |4495  |The air is very poluted             |2019_04   |
|102|Leiden    |4907  |I would love to sing                |2019

In [5]:
df1.printSchema()

root
 |-- id: string (nullable = true)
 |-- town: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- texting: string (nullable = true)
 |-- start_week: string (nullable = true)



In [6]:
#create second dataframe with more user data

# Setup the Schema
schema = t.StructType([
    t.StructField("id", t.StringType(),True),
    t.StructField("day", t.StringType(),True),
    t.StructField("price", t.IntegerType(),True),
    t.StructField("units", t.IntegerType(),True),
])

data2 = ([
        ('100','1',23,10),
        ('100','2',45,11),
        ('100','3',67,12),
        ('100','4',78,13),
        ('101','1',23,10),
        ('101','2',45,13),
        ('101','3',67,14),
        ('101','4',78,15),
        ('102','1',23,10),
        ('102','2',45,11),
        ('102','3',67,16),
        ('102','4',78,18),
    ])

# Setup the Data Frame
df2 = sqlcontext.createDataFrame(data2,schema=schema)
df2.show(truncate=False)

+---+---+-----+-----+
|id |day|price|units|
+---+---+-----+-----+
|100|1  |23   |10   |
|100|2  |45   |11   |
|100|3  |67   |12   |
|100|4  |78   |13   |
|101|1  |23   |10   |
|101|2  |45   |13   |
|101|3  |67   |14   |
|101|4  |78   |15   |
|102|1  |23   |10   |
|102|2  |45   |11   |
|102|3  |67   |16   |
|102|4  |78   |18   |
+---+---+-----+-----+



In [7]:
df2.printSchema()

root
 |-- id: string (nullable = true)
 |-- day: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- units: integer (nullable = true)



### Aggregation

In [8]:
stats = (
    df1
    .groupBy('town')
    .agg(
        f.min('salary').alias('min_salary'),
        f.max('salary').alias('max_salary'),
        f.round(f.avg('salary'),2).alias('avg_salary')
        )
    )

stats.show()

+----------+----------+----------+----------+
|      town|min_salary|max_salary|avg_salary|
+----------+----------+----------+----------+
|    Leiden|      1947|      9403|   5474.17|
|Amersfoort|      2365|      9211|    4821.6|
|   Woerden|      1778|      5813|    4123.0|
|     Delft|      1395|      6874|    3730.0|
| Dordrecht|      4899|      5500|    5199.5|
+----------+----------+----------+----------+



### Window Function

In [9]:
#calculate rolling average

w = Window.partitionBy('town').orderBy('start_week')

rolling_avg = (
    df1
    .withColumn('rolling_average' , f.round(f.avg('salary').over(w),1))
    )
    

rolling_avg.show(truncate=False)    

+---+----------+------+------------------------------------+----------+---------------+
|id |town      |salary|texting                             |start_week|rolling_average|
+---+----------+------+------------------------------------+----------+---------------+
|99 |Amersfoort|9211  |Why are you glaring at me?          |2019_05   |9211.0         |
|199|Amersfoort|2365  |A ring for my wife                  |2019_10   |5788.0         |
|711|Amersfoort|5397  |Why am I singled out?               |2019_21   |5657.7         |
|33 |Amersfoort|4500  |It is great to breath some fresh air|2019_23   |5368.3         |
|242|Amersfoort|2635  |I am browsing the internet          |2019_25   |4821.6         |
|953|Delft     |1395  |You are singing                     |2019_11   |1395.0         |
|796|Delft     |6874  |Love is in the air                  |2019_15   |4134.5         |
|904|Delft     |2921  |I need some fresh air               |2019_32   |3730.0         |
|70 |Dordrecht |5500  |I love to

In [10]:
#counting rows per town
count_rows = df1.withColumn("row_num", row_number().over(w))

count_rows.show()

+---+----------+------+--------------------+----------+-------+
| id|      town|salary|             texting|start_week|row_num|
+---+----------+------+--------------------+----------+-------+
| 99|Amersfoort|  9211|Why are you glari...|   2019_05|      1|
|199|Amersfoort|  2365|  A ring for my wife|   2019_10|      2|
|711|Amersfoort|  5397|Why am I singled ...|   2019_21|      3|
| 33|Amersfoort|  4500|It is great to br...|   2019_23|      4|
|242|Amersfoort|  2635|I am browsing the...|   2019_25|      5|
|953|     Delft|  1395|     You are singing|   2019_11|      1|
|796|     Delft|  6874|  Love is in the air|   2019_15|      2|
|904|     Delft|  2921|I need some fresh...|   2019_32|      3|
| 70| Dordrecht|  5500|I love to sing in...|   2019_14|      1|
|357| Dordrecht|  4899| The bell is ringing|   2019_28|      2|
|190|    Leiden|  2166|      I like my ring|   2019_01|      1|
| 39|    Leiden|  5628|This food is mout...|   2019_07|      2|
|329|    Leiden|  8794| Singing in the r

### Regular Expressions

In [11]:
terms_list = ['ring', 'sing', 'air']
terms = '|'.join(terms_list)
#terms


search_terms = (
    df1
    .where(f.lower(f.col('texting')).rlike(terms))
    .select('id','texting')
    )

n_terms = search_terms.count()


search_terms.show(truncate=False)
print(f"Number of rows: {n_terms}")

+---+------------------------------------+
|id |texting                             |
+---+------------------------------------+
|190|I like my ring                      |
|99 |Why are you glaring at me?          |
|329|Singing in the rain                 |
|294|Christmas was fairylike             |
|199|A ring for my wife                  |
|904|I need some fresh air               |
|242|I am browsing the internet          |
|367|The airfares are very expensive     |
|357|The bell is ringing                 |
|278|The air is very poluted             |
|102|I would love to sing                |
|490|I need a new hairbrush              |
|711|Why am I singled out?               |
|796|Love is in the air                  |
|39 |This food is mouthwatering          |
|953|You are singing                     |
|55 |His wife has an affair              |
|70 |I love to sing in the rain          |
|33 |It is great to breath some fresh air|
|100|Why are you refusing?               |
+---+------

In [12]:
#exact match

def exact_match(items):
    return [f"\\b{item}\\b" for item in items]

exact_match_terms_list = exact_match(terms_list)
exact_match_terms = '|'.join(exact_match_terms_list)
#exact_match_terms


search_terms2 = (
    df1
    .where(f.lower(f.col('texting')).rlike(exact_match_terms))
    )

n_terms_exact_match = search_terms2.count()


search_terms2.show(truncate=False)
print(f"Number of rows: {n_terms_exact_match}")


+---+----------+------+------------------------------------+----------+
|id |town      |salary|texting                             |start_week|
+---+----------+------+------------------------------------+----------+
|190|Leiden    |2166  |I like my ring                      |2019_01   |
|199|Amersfoort|2365  |A ring for my wife                  |2019_10   |
|904|Delft     |2921  |I need some fresh air               |2019_32   |
|278|Woerden   |4495  |The air is very poluted             |2019_04   |
|102|Leiden    |4907  |I would love to sing                |2019_18   |
|796|Delft     |6874  |Love is in the air                  |2019_15   |
|70 |Dordrecht |5500  |I love to sing in the rain          |2019_14   |
|33 |Amersfoort|4500  |It is great to breath some fresh air|2019_23   |
+---+----------+------+------------------------------------+----------+

Number of rows: 8


In [13]:
#search for strings that start with the letter 'i'

term_i = '^i\\b'

search_terms3 = (
    df1
    .where(f.lower(f.col('texting')).rlike(term_i))
    )


n_terms_i = search_terms3.count()


search_terms3.show()
print(f"Number of rows: {n_terms_i}")


+---+----------+------+--------------------+----------+
| id|      town|salary|             texting|start_week|
+---+----------+------+--------------------+----------+
|190|    Leiden|  2166|      I like my ring|   2019_01|
|904|     Delft|  2921|I need some fresh...|   2019_32|
|242|Amersfoort|  2635|I am browsing the...|   2019_25|
|102|    Leiden|  4907|I would love to sing|   2019_18|
|490|   Woerden|  1778|I need a new hair...|   2019_37|
| 70| Dordrecht|  5500|I love to sing in...|   2019_14|
+---+----------+------+--------------------+----------+

Number of rows: 6


### Pivot tables

In [14]:
pivot_data = (
    df2
    .groupBy("id") 
    .pivot("day") 
    .agg(f.first("price")) 
    )
    
pivot_data.show()



+---+---+---+---+---+
| id|  1|  2|  3|  4|
+---+---+---+---+---+
|101| 23| 45| 67| 78|
|100| 23| 45| 67| 78|
|102| 23| 45| 67| 78|
+---+---+---+---+---+

