# Spark and PySpark Udemy Course

## Introduction

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("basics").getOrCreate()

24/06/05 20:02:37 WARN Utils: Your hostname, agusrichard.local resolves to a loopback address: 127.0.0.1; using 192.168.0.103 instead (on interface en0)
24/06/05 20:02:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/05 20:02:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv("./files/mydata.csv", header='true')

                                                                                

In [4]:
df.show()

+-----+-----+
| name|sales|
+-----+-----+
| John|  100|
|Cindy|   86|
| Carl|  230|
+-----+-----+


In [5]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- sales: string (nullable = true)


In [6]:
df.describe().show()

                                                                                

+-------+----+------------------+
|summary|name|             sales|
+-------+----+------------------+
|  count|   3|                 3|
|   mean|NULL|138.66666666666666|
| stddev|NULL| 79.40612906654835|
|    min|Carl|               100|
|    max|John|                86|
+-------+----+------------------+


In [7]:
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType
)

In [8]:
columns_schema = [
    StructField('name', StringType(), True),
    StructField('sales', IntegerType(), True)
]

In [9]:
schema = StructType(fields=columns_schema)

In [10]:
df = spark.read.csv("./files/mydata.csv", header=True, schema=schema)

In [11]:
df.show()

+-----+-----+
| name|sales|
+-----+-----+
| John|  100|
|Cindy|   86|
| Carl|  230|
+-----+-----+


## Select colums

In [13]:
df.select('sales').show()

+-----+
|sales|
+-----+
|  100|
|   86|
|  230|
+-----+


In [14]:
df.select(['name', 'sales']).show()

+-----+-----+
| name|sales|
+-----+-----+
| John|  100|
|Cindy|   86|
| Carl|  230|
+-----+-----+


## Create a new column

In [15]:
df_new = df.withColumn('double_sales', df['sales']*2)
df_new.show()

+-----+-----+------------+
| name|sales|double_sales|
+-----+-----+------------+
| John|  100|         200|
|Cindy|   86|         172|
| Carl|  230|         460|
+-----+-----+------------+


In [25]:
sales_average = df.agg({'sales': 'mean'}).collect()

In [26]:
sales_average = sales_average[0].asDict()['avg(sales)']

In [27]:
sales_average

138.66666666666666

In [28]:
df_new = df_new.withColumn('greater_than_mean_sales', df['sales'] > sales_average)

In [29]:
df_new.show()

+-----+-----+------------+-----------------------+
| name|sales|double_sales|greater_than_mean_sales|
+-----+-----+------------+-----------------------+
| John|  100|         200|                  false|
|Cindy|   86|         172|                  false|
| Carl|  230|         460|                   true|
+-----+-----+------------+-----------------------+


## Use SQL query to select data

In [30]:
df.createOrReplaceTempView('sales')

In [31]:
df_result = spark.sql("SELECT * FROM sales")
df_result.show()

+-----+-----+
| name|sales|
+-----+-----+
| John|  100|
|Cindy|   86|
| Carl|  230|
+-----+-----+


                                                                                

In [32]:
spark.sql("SELECT * FROM sales WHERE sales > 100").show()

+----+-----+
|name|sales|
+----+-----+
|Carl|  230|
+----+-----+


## Filtering data

In [33]:
spark = SparkSession.builder.appName("operations").getOrCreate()

24/06/05 20:12:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [34]:
df = spark.read.csv("./files/appl_stock.csv", header=True, inferSchema=True)

In [35]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)


In [37]:
df.filter("Close < 500").show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [40]:
df.filter("Open > 500").select(["Date", "Open", "High"]).show()

+----------+------------------+------------------+
|      Date|              Open|              High|
+----------+------------------+------------------+
|2012-02-14|        504.659988|         509.56002|
|2012-02-15|        514.259995|        526.290016|
|2012-02-17|        503.109993|507.77002000000005|
|2012-02-21|506.88001299999996|        514.850021|
|2012-02-22|        513.079994|        515.489983|
|2012-02-23|        515.079987|        517.830009|
|2012-02-24| 519.6699980000001|        522.899979|
|2012-02-27|        521.309982|             528.5|
|2012-02-28|        527.960014|        535.410011|
|2012-02-29| 541.5600049999999| 547.6100230000001|
|2012-03-01|        548.169983|        548.209984|
|2012-03-02|        544.240013|        546.800018|
|2012-03-05|        545.420013|         547.47998|
|2012-03-06|        523.659996|        533.690025|
|2012-03-07| 536.8000030000001|        537.779999|
|2012-03-08| 534.6899950000001|        542.989998|
|2012-03-09|        544.209999|

In [42]:
df.filter(df["Open"] > 500).select(["Date", "Open", "Close", "High"]).show()

[Stage 27:>                                                         (0 + 1) / 1]

+----------+------------------+-----------------+------------------+
|      Date|              Open|            Close|              High|
+----------+------------------+-----------------+------------------+
|2012-02-14|        504.659988|       509.459991|         509.56002|
|2012-02-15|        514.259995|       497.669975|        526.290016|
|2012-02-17|        503.109993|        502.12001|507.77002000000005|
|2012-02-21|506.88001299999996|       514.850021|        514.850021|
|2012-02-22|        513.079994|       513.039993|        515.489983|
|2012-02-23|        515.079987|516.3899769999999|        517.830009|
|2012-02-24| 519.6699980000001|522.4099809999999|        522.899979|
|2012-02-27|        521.309982|       525.760017|             528.5|
|2012-02-28|        527.960014|       535.410011|        535.410011|
|2012-02-29| 541.5600049999999|       542.440025| 547.6100230000001|
|2012-03-01|        548.169983|544.4699780000001|        548.209984|
|2012-03-02|        544.240013|   

                                                                                

In [44]:
df.filter((df["Open"] > 200) & (df["Close"] < 200)).select(["Date", "Open", "Close", "High"]).show()

+----------+------------------+----------+----------+
|      Date|              Open|     Close|      High|
+----------+------------------+----------+----------+
|2010-01-22|206.78000600000001|    197.75|207.499996|
|2010-01-28|        204.930004|199.289995|205.500004|
|2010-01-29|        201.079996|192.060003|202.199995|
+----------+------------------+----------+----------+


In [45]:
df.filter(df["Low"] == 197.16).show()

+----------+------------------+----------+------+------+---------+---------+
|      Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+----------+------------------+----------+------+------+---------+---------+
|2010-01-22|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+----------+------------------+----------+------+------+---------+---------+


In [46]:
result = df.filter(df["Low"] == 197.16).collect()[0]

In [47]:
result.asDict()

{'Date': datetime.date(2010, 1, 22),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

## Group by and aggregate functions

In [48]:
spark = SparkSession.builder.appName("aggregate").getOrCreate()
df = spark.read.csv('./files/sales_info.csv', header=True, inferSchema=True)

24/06/05 20:20:03 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [49]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+


In [50]:
df.groupBy("Company")

GroupedData[grouping expressions: [Company], value: [Company: string, Person: string ... 1 more field], type: GroupBy]

In [52]:
df.groupBy("Company").mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+


In [53]:
df.groupBy("Company").max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+


In [54]:
df.groupBy("Company").agg({"Sales": "mean", "Person": "count"}).show()

+-------+-----------------+-------------+
|Company|       avg(Sales)|count(Person)|
+-------+-----------------+-------------+
|   APPL|            370.0|            4|
|   GOOG|            220.0|            3|
|     FB|            610.0|            2|
|   MSFT|322.3333333333333|            3|
+-------+-----------------+-------------+


## Aggregate functions

In [60]:
from pyspark.sql.functions import count_distinct, avg, stddev, format_number

In [56]:
df.select(count_distinct('Sales')).show()

[Stage 43:>                                                         (0 + 1) / 1]

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+


                                                                                

In [57]:
df.select(count_distinct('Sales').alias('count_distinct_sales')).show()

+--------------------+
|count_distinct_sales|
+--------------------+
|                  11|
+--------------------+


In [58]:
df.select(avg('Sales').alias('avg_sales')).show()

+-----------------+
|        avg_sales|
+-----------------+
|360.5833333333333|
+-----------------+


In [59]:
df.select(stddev('Sales').alias('std_sales')).show()

+------------------+
|         std_sales|
+------------------+
|250.08742410799007|
+------------------+


In [61]:
avg_sales = df.select(avg('Sales').alias('avg_sales'))
avg_sales.select(format_number('avg_sales', 2)).show()

+---------------------------+
|format_number(avg_sales, 2)|
+---------------------------+
|                     360.58|
+---------------------------+


## Order by

In [62]:
df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+


In [63]:
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+


## Handle missing data

In [64]:
spark = SparkSession.builder.appName("missing_data").getOrCreate()
df = spark.read.csv("./files/ContainsNull.csv", header=True, inferSchema=True)
df.show()

24/06/05 20:32:28 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+


## Drop missing data

In [66]:
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+


In [67]:
# Has to have at least 2 NON-null values
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+


In [68]:
df.na.drop(subset=["Sales"]).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+


## Fill missing values

In [69]:
df.na.fill('NEW VALUE').show()

+----+---------+-----+
|  Id|     Name|Sales|
+----+---------+-----+
|emp1|     John| NULL|
|emp2|NEW VALUE| NULL|
|emp3|NEW VALUE|345.0|
|emp4|    Cindy|456.0|
+----+---------+-----+


In [70]:
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| NULL|  0.0|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+


In [71]:
df.na.fill(0, subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| NULL|  0.0|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+


In [75]:
df.na.fill('N/A', subset=['Name']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2|  N/A| NULL|
|emp3|  N/A|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+


In [77]:
avg_sales = df.select(avg('Sales').alias('avg_sales')).collect()
avg_sales = avg_sales[0].asDict()["avg_sales"]

                                                                                

In [78]:
df.na.fill(avg_sales, subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| NULL|400.5|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+


## Dates and Timestamps

In [79]:
spark = SparkSession.builder.appName("dates").getOrCreate()
df = spark.read.csv("./files/appl_stock.csv", header=True, inferSchema=True)
df.show()

24/06/05 20:39:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [80]:
from pyspark.sql.functions import dayofmonth, dayofyear, hour, month, year

In [81]:
df.select(dayofyear('Date')).show()

+---------------+
|dayofyear(Date)|
+---------------+
|              4|
|              5|
|              6|
|              7|
|              8|
|             11|
|             12|
|             13|
|             14|
|             15|
|             19|
|             20|
|             21|
|             22|
|             25|
|             26|
|             27|
|             28|
|             29|
|             32|
+---------------+


                                                                                

In [82]:
df = df.withColumn('Year', year(df['Date']))
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2010|
|2010-01-11|212.

In [84]:
df.groupBy('Year').avg().select(["Year", "avg(Open)", "avg(Close)"]).show()

+----+------------------+------------------+
|Year|         avg(Open)|        avg(Close)|
+----+------------------+------------------+
|2015|120.17575393253965|120.03999980555547|
|2013| 473.1281355634922| 472.6348802857143|
|2014| 295.1426195357143| 295.4023416507935|
|2012|     576.652720788| 576.0497195640002|
|2016|104.50777772619044|104.60400786904763|
|2010| 259.9576190992064| 259.8424600000002|
|2011|364.06142773412705|364.00432532142867|
+----+------------------+------------------+


                                                                                