In [1]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
## Inside docker we do not need to use findspark
# import findspark
# findspark.init('/home/fdelca/Documents/spark/spark-2.1.2-bin-hadoop2.7')

import pyspark
import os

In [3]:
! pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/
                        
Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_252
Branch HEAD
Compiled by user centos on 2020-02-02T19:38:06Z
Revision cee4ecbb16917fa85f02c635925e2687400aa56b
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.


In [4]:
! python --version

Python 3.7.6


### Start a spark session

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

22/05/26 10:50:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Spark DataFrames - Lesson 1

### Read Data

In [7]:
df = spark.read.json(os.path.join('data','people.json'))

In [8]:
# Show the dataframe
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [9]:
# Know the schema
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



Depending on your datasource, spark can dtype everything as a string - be caution with that

In [10]:
df.columns

['age', 'name']

In [11]:
# Statistical summary of your columns
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



### Define Schema
- Sometimes we need to clarify the datatypes

In [12]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [13]:
data_schema = [
    StructField('age', IntegerType(), True), #Column name, Class instance (integer type), T/F can it be null?
    StructField('name', StringType(), True),
]

In [14]:
final_struc = StructType(fields=data_schema)

In [15]:
df = spark.read.json(os.path.join('data','people.json'), schema=final_struc)

In [16]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



### Data Manipulations

#### Select Data

In [17]:
# Get a column object
type(df['age'])

pyspark.sql.column.Column

In [18]:
# With select we can have a dataframe, and then we can add show() to see the column
type(df.select('age'))

pyspark.sql.dataframe.DataFrame

In [19]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [20]:
# Select the first two rows
df.head(2)[0]

Row(age=None, name='Michael')

In [21]:
df.select(['age', 'name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Create a dataframe with a new column

In [22]:
df.withColumn('double_age', df['age']*2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [23]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



We need to save it to a new variable, it is not an inplace operation

#### Rename a column

In [24]:
# Rename a column
df.withColumnRenamed('age', 'my_new_age').show()

+----------+-------+
|my_new_age|   name|
+----------+-------+
|      null|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



### Using Pure SQL

1. Register the df as a temporary view in SQL 

In [25]:
df.createOrReplaceTempView('people')

In [26]:
results = spark.sql("SELECT * FROM people")

In [27]:
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [28]:
new_results = spark.sql("SELECT * FROM people WHERE age=30")

In [29]:
new_results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



## Spark DataFrames - Lesson 2

In [30]:
df = spark.read.csv(os.path.join('data', 'appl_stock.csv'), inferSchema=True, header=True)

In [31]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [32]:
df.head(3)[0]

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

### Filter Data

In [36]:
# One can use filter and select together, to make two different operations
df.filter("Close < 500").select(["Open", "Close"]).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



In [38]:
df.filter(df['Close'] < 500).select('Volume').show()

+---------+
|   Volume|
+---------+
|123432400|
|150476200|
|138040000|
|119282800|
|111902700|
|115557400|
|148614900|
|151473000|
|108223500|
|148516900|
|182501900|
|153038200|
|152038600|
|220441900|
|266424900|
|466777500|
|430642100|
|293375600|
|311488100|
|187469100|
+---------+
only showing top 20 rows



### Filter Data - Multiple Conditions

In [42]:
# filter method works similar to python
df.filter((df['Close'] < 200) & (df['Open']>200)).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



#### Use `collect()`

When using `collect()` we got a list with the values of the filter dataframe

In [44]:
# Show()
df.filter(df['Low'] == 197.16).show()

+-------------------+------------------+----------+------+------+---------+---------+
|               Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+-------------------+------------------+----------+------+------+---------+---------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+-------------------+------------------+----------+------+------+---------+---------+



In [46]:
# Collect() - we can work with this variable
result = df.filter(df['Low'] == 197.16).collect()
result

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [50]:
row = result[0]
row

Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)

In [53]:
# We can transform it into a dictionary
display(row.asDict())
# And select the value of interest
row.asDict()['Volume']

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

220441900

## GroupBy and Aggregate - Lesson 3

In [55]:
df = spark.read.csv(os.path.join('data','sales_info.csv'), inferSchema=True, header=True)

In [56]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



### GroupBy a column

In [65]:
# mean(), max(), min(), sum(), count() [how many rows] - can be used in this method
df.groupBy('Company').count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



### Aggregate

In [67]:
# Similar to python function
df.agg({'Sales':'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [76]:
from pyspark.sql import functions

In [86]:
group_data = df.groupby('Company')

group_data.agg(functions.min('Sales').alias('Min Sales'), 
               functions.max('Sales').alias('Max Sales'), 
               functions.avg('Sales').alias('Avg Sales'),
               functions.stddev('Sales').alias('Stdv Sales'),
               functions.countDistinct('Person').alias('Sellers'),
              ).show()



+-------+---------+---------+-----------------+------------------+-------+
|Company|Min Sales|Max Sales|        Avg Sales|        Stdv Sales|Sellers|
+-------+---------+---------+-----------------+------------------+-------+
|   APPL|    130.0|    750.0|            370.0|  268.824602048746|      4|
|   GOOG|    120.0|    340.0|            220.0|111.35528725660043|      3|
|     FB|    350.0|    870.0|            610.0| 367.6955262170047|      2|
|   MSFT|    124.0|    600.0|322.3333333333333| 247.7182539364698|      3|
+-------+---------+---------+-----------------+------------------+-------+



In [88]:
# With select one can apply to only one colummn

df.select(functions.countDistinct('Sales').alias('Average Sales')).show()

+-------------+
|Average Sales|
+-------------+
|           11|
+-------------+



- **alias()** method allow us to give a name to the calculated column

#### Format Numbers

In [90]:
from pyspark.sql.functions import format_number

In [93]:
sales_std = df.select(functions.stddev('Sales').alias('std'))

In [95]:
sales_std.select(functions.format_number('std', 2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



In [96]:
# Shortest way to get the same result
df.select(
    functions.stddev('Sales').alias('std')
).select(
    functions.format_number('std', 2).alias('std')
).show()

+------+
|   std|
+------+
|250.09|
+------+



### Order by

In [97]:
# Ascending
df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [98]:
# Descending - more work - maybe in spark 3 is already solved
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



## Missing Data - Lesson 4

In [99]:
df = spark.read.csv(os.path.join('data', 'ContainsNull.csv'), header=True, inferSchema=True)
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [100]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



The function that deals with missing data is called `na` and it has different methods:
- `drop()`

In [103]:
# thresh : allows you to drop only rows that respect a certain argument
    # In the example below, the row needs to have at least 2 null values to be dropped
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [106]:
# how : 'all', 'any'
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



- `fill()`

In [107]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



**Spark** is intelligence enough to fill only the columns that have the same type as the value the user is asking to fill in

In [109]:
df.na.fill('FILLVALUE').show() # There was no need to specify the column

+----+---------+-----+
|  Id|     Name|Sales|
+----+---------+-----+
|emp1|     John| null|
|emp2|FILLVALUE| null|
|emp3|FILLVALUE|345.0|
|emp4|    Cindy|456.0|
+----+---------+-----+



In [110]:
df.na.fill(0).show() # There was no need to specify the column

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



But the user can always specify the column that wants to be filled

In [111]:
df.na.fill('No Name', subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [112]:
from pyspark.sql import functions

In [113]:
mean_val = df.select(functions.mean(df['Sales'])).collect()

In [123]:
# mean_sales = mean_val[0].asDict()['avg(Sales)']
mean_sales = mean_val[0][0]
mean_sales

400.5

In [124]:
df.na.fill(mean_sales, ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [126]:
df.na.fill(
    df.select(functions.mean(df['Sales'])).collect()[0][0],
    ['Sales']
).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



## Dates and TimeStamps - Lesson 5

In [127]:
df = spark.read.csv(os.path.join('data', 'appl_stock.csv'), inferSchema=True, header=True)
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [130]:
df.select(['Date', 'Open']).show()

+-------------------+------------------+
|               Date|              Open|
+-------------------+------------------+
|2010-01-04 00:00:00|        213.429998|
|2010-01-05 00:00:00|        214.599998|
|2010-01-06 00:00:00|        214.379993|
|2010-01-07 00:00:00|            211.75|
|2010-01-08 00:00:00|        210.299994|
|2010-01-11 00:00:00|212.79999700000002|
|2010-01-12 00:00:00|209.18999499999998|
|2010-01-13 00:00:00|        207.870005|
|2010-01-14 00:00:00|210.11000299999998|
|2010-01-15 00:00:00|210.92999500000002|
|2010-01-19 00:00:00|        208.330002|
|2010-01-20 00:00:00|        214.910006|
|2010-01-21 00:00:00|        212.079994|
|2010-01-22 00:00:00|206.78000600000001|
|2010-01-25 00:00:00|202.51000200000001|
|2010-01-26 00:00:00|205.95000100000001|
|2010-01-27 00:00:00|        206.849995|
|2010-01-28 00:00:00|        204.930004|
|2010-01-29 00:00:00|        201.079996|
|2010-02-01 00:00:00|192.36999699999998|
+-------------------+------------------+
only showing top

In [131]:
from pyspark.sql.functions import (dayofmonth, hour, dayofyear, month,
                                   year, weekofyear, format_number, date_format)

**Hour/Month/Dayofmonth**

In [135]:
# df.select(dayofmonth(df['Date'])).show()
# df.select(hour(df['Date'])).show()
df.select(month(df['Date'])).show()

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          2|
+-----------+
only showing top 20 rows



#### Average closing price per year

In [148]:
newdf = df.withColumn(colName='Year', col=year(df['Date']))

In [160]:
newdf.head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039, Year=2010)]

In [154]:
result = newdf.groupBy('Year').agg(functions.avg('Close').alias('Avg Closing Price'))

In [159]:
result.select(['Year', format_number('Avg Closing Price', 2).alias('Avg Closing Price')]).show()

+----+-----------------+
|Year|Avg Closing Price|
+----+-----------------+
|2015|           120.04|
|2013|           472.63|
|2014|           295.40|
|2012|           576.05|
|2016|           104.60|
|2010|           259.84|
|2011|           364.00|
+----+-----------------+

