 # <span class="burk">Pyspark Datafram Master</span> 

## Spark DataFrame Basics

Spark DataFrames are the workhouse and main way of working with Spark and Python post Spark 2.0. DataFrames act as powerful versions of tables, with rows and columns, easily handling large datasets. The shift to DataFrames provides many advantages:
* A much simpler syntax
* Ability to use SQL directly in the dataframe
* Operations are automatically distributed across RDDs
    
If you've used R or even the pandas library with Python you are probably already familiar with the concept of DataFrames. Spark DataFrame expand on a lot of these concepts, allowing you to transfer that knowledge easily by understanding the simple syntax of Spark DataFrames. Remember that the main advantage to using Spark DataFrames vs those other programs is that Spark can handle data across many RDDs, huge data sets that would never fit on a single computer. That comes at a slight cost of some "peculiar" syntax choices, but after this course you will feel very comfortable with all those topics!

In [4]:
# First let's create our PySpark instance
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("master-session").getOrCreate()
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
appid = spark._jsc.sc().applicationId()
print("You are working with", cores, "core(s) on appid: ",appid)
spark

You are working with 1 core(s) on appid:  local-1634565969155


## Reading in data in different format

In [3]:
path ="../Datasets/"

# CSV
df = spark.read.csv(path+'students.csv',inferSchema=True,header=True)

# Json
people = spark.read.json(path+'people.json')

# Parquet
parquet = spark.read.parquet(path+'users1.parquet')

# Partioned Parquet
partitioned = spark.read.parquet(path+'users*')

# Parts of a partitioned Parquet 2 files here
users1_2 = spark.read.option("basePath", path).parquet(path+'users1.parquet', path+'users2.parquet')

**Parquet Files**

Now try reading in a parquet file. This is most common data type in the big data world.
Why? because it is the most compact file storage method (even better than zipped files!)

In [4]:
parquet = spark.read.parquet(path+'users1.parquet')
parquet.show(2)

+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|  registration_dttm| id|first_name|last_name|           email|gender|    ip_address|              cc|  country|birthdate|   salary|           title|comments|
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|2016-02-03 08:55:29|  1|    Amanda|   Jordan|ajordan0@com.com|Female|   1.197.201.2|6759521864920116|Indonesia| 3/8/1971| 49756.53|Internal Auditor|   1E+02|
|2016-02-03 18:04:03|  2|    Albert|  Freeman| afreeman1@is.gd|  Male|218.111.175.34|                |   Canada|1/16/1968|150280.17|   Accountant IV|        |
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
only showing top 2 rows



**Partitioned Parquet Files**

Actually most big datasets will be partitioned. Here is how you can collect all the pieces (parts) of the dataset in one simple command.

In [5]:
partitioned = spark.read.parquet(path+'users*')
partitioned.show(2)

+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|  registration_dttm| id|first_name|last_name|           email|gender|    ip_address|              cc|  country|birthdate|   salary|           title|comments|
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|2016-02-03 08:55:29|  1|    Amanda|   Jordan|ajordan0@com.com|Female|   1.197.201.2|6759521864920116|Indonesia| 3/8/1971| 49756.53|Internal Auditor|   1E+02|
|2016-02-03 18:04:03|  2|    Albert|  Freeman| afreeman1@is.gd|  Male|218.111.175.34|                |   Canada|1/16/1968|150280.17|   Accountant IV|        |
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
only showing top 2 rows



<div class="mark">
You can also opt to read in only a specific set of paritioned parquet files. Say for example that you only wanted users1 and users2 and not users3</div><i class="fa fa-lightbulb-o "></i>

In [6]:
# Note that the .option("basePath", path) option is used to override the automatic function
# that will exclude the partitioned variable in resulting dataframe. 
# I prefer to have the partitioning info in my new dataframe personally. 
users1_2 = spark.read.option("basePath", path).parquet(path+'users1.parquet', path+'users2.parquet')
users1_2.show()

+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|  registration_dttm| id|first_name|last_name|               email|gender|     ip_address|                 cc|             country| birthdate|   salary|               title|            comments|
+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|2016-02-03 08:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|    1.197.201.2|   6759521864920116|           Indonesia|  3/8/1971| 49756.53|    Internal Auditor|               1E+02|
|2016-02-03 18:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male| 218.111.175.34|                   |              Canada| 1/16/1968|150280.17|       Accountant IV|                    |
|2016-02-03 02:09:31|  3|

### read data from s3 bucket 

In [None]:

# #### If you're in AWS storing data in s3 buckets your code will more like this...

bucket = "my_bucket"
key1 = "partition_test/Table1/CREATED_YEAR=2015/*"
key2 = "partition_test/Table1/CREATED_YEAR=2017/*"
key3 = "partition_test/Table1/CREATED_YEAR=2018/*"

test_df = spark.read.parquet('s3://'+bucket+'/'+key1, \
                             's3://'+bucket+'/'+key2, \
                             's3://'+bucket+'/'+key3)

test_df.show(1)


### Notice the type differences here

In [9]:
type(df)

pyspark.sql.dataframe.DataFrame

In [10]:
# convert it to Panda and write it back to new variale 

pandaDF = df.toPandas()
type(pandaDF)

pandas.core.frame.DataFrame

## Validate Schema and content at a glance

Always a good idea to do this to ensure that dataframe was read in correctly.

In [11]:
df.limit(4).toPandas()

Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
0,female,group B,bachelor's degree,standard,none,72,72,74
1,female,group C,some college,standard,completed,69,90,88
2,female,group B,master's degree,standard,none,90,95,93
3,male,group A,associate's degree,free/reduced,none,47,57,44


In [12]:
df.printSchema()

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: integer (nullable = true)
 |-- reading score: integer (nullable = true)
 |-- writing score: integer (nullable = true)



In [13]:
# Or just the column names
df.columns

['gender',
 'race/ethnicity',
 'parental level of education',
 'lunch',
 'test preparation course',
 'math score',
 'reading score',
 'writing score']

In [14]:
#Or just the data types which you can call on as a list
df.dtypes

[('gender', 'string'),
 ('race/ethnicity', 'string'),
 ('parental level of education', 'string'),
 ('lunch', 'string'),
 ('test preparation course', 'string'),
 ('math score', 'int'),
 ('reading score', 'int'),
 ('writing score', 'int')]

In [16]:
# Or just the type of one column
df.schema['lunch'].dataType

StringType

In [15]:
# Neat "describe" function
df.describe(['math score']).show()

+-------+------------------+
|summary|        math score|
+-------+------------------+
|  count|              1000|
|   mean|            66.089|
| stddev|15.163080096009454|
|    min|                 0|
|    max|               100|
+-------+------------------+



In [17]:
# Summary function
df.select("math score", "reading score", "writing score").summary("count", "min", "25%", "75%", "max").show()

+-------+----------+-------------+-------------+
|summary|math score|reading score|writing score|
+-------+----------+-------------+-------------+
|  count|      1000|         1000|         1000|
|    min|         0|           17|           10|
|    25%|        57|           59|           57|
|    75%|        77|           79|           79|
|    max|       100|          100|          100|
+-------+----------+-------------+-------------+



### read data with pre-datatype

In [25]:
df1 = spark.read.options(header = True, delimiter = ',').csv("../resources/zipcodes.csv")
df1.limit(5).toPandas()

Unnamed: 0,RecordNumber,Zipcode,ZipCodeType,City,State,LocationType,Lat,Long,Xaxis,Yaxis,Zaxis,WorldRegion,Country,LocationText,Location,Decommisioned,TaxReturnsFiled,EstimatedPopulation,TotalWages,Notes
0,1,704,STANDARD,PARC PARQUE,PR,NOT ACCEPTABLE,17.96,-66.22,0.38,-0.87,0.3,,US,"Parc Parque, PR",NA-US-PR-PARC PARQUE,False,,,,
1,2,704,STANDARD,PASEO COSTA DEL SUR,PR,NOT ACCEPTABLE,17.96,-66.22,0.38,-0.87,0.3,,US,"Paseo Costa Del Sur, PR",NA-US-PR-PASEO COSTA DEL SUR,False,,,,
2,10,709,STANDARD,BDA SAN LUIS,PR,NOT ACCEPTABLE,18.14,-66.26,0.38,-0.86,0.31,,US,"Bda San Luis, PR",NA-US-PR-BDA SAN LUIS,False,,,,
3,61391,76166,UNIQUE,CINGULAR WIRELESS,TX,NOT ACCEPTABLE,32.72,-97.31,-0.1,-0.83,0.54,,US,"Cingular Wireless, TX",NA-US-TX-CINGULAR WIRELESS,False,,,,
4,61392,76177,STANDARD,FORT WORTH,TX,PRIMARY,32.75,-97.33,-0.1,-0.83,0.54,,US,"Fort Worth, TX",NA-US-TX-FORT WORTH,False,2126.0,4053.0,122396986.0,


In [27]:
df1.printSchema()
# so it did not print schema correctly 

root
 |-- RecordNumber: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Xaxis: string (nullable = true)
 |-- Yaxis: string (nullable = true)
 |-- Zaxis: string (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: string (nullable = true)
 |-- TaxReturnsFiled: string (nullable = true)
 |-- EstimatedPopulation: string (nullable = true)
 |-- TotalWages: string (nullable = true)
 |-- Notes: string (nullable = true)



In [22]:
from pyspark.sql.types import * 

In [24]:
# The data-type does not look correct. lets change the data-type, and read schema again with corret data-type

schema = StructType() \
      .add("RecordNumber",IntegerType(),True) \
      .add("Zipcode",IntegerType(),True) \
      .add("ZipCodeType",StringType(),True) \
      .add("City",StringType(),True) \
      .add("State",StringType(),True) \
      .add("LocationType",StringType(),True) \
      .add("Lat",DoubleType(),True) \
      .add("Long",DoubleType(),True) \
      .add("Xaxis",IntegerType(),True) \
      .add("Yaxis",DoubleType(),True) \
      .add("Zaxis",DoubleType(),True) \
      .add("WorldRegion",StringType(),True) \
      .add("Country",StringType(),True) \
      .add("LocationText",StringType(),True) \
      .add("Location",StringType(),True) \
      .add("Decommisioned",BooleanType(),True) \
      .add("TaxReturnsFiled",StringType(),True) \
      .add("EstimatedPopulation",IntegerType(),True) \
      .add("TotalWages",IntegerType(),True) \
      .add("Notes",StringType(),True)

In [30]:
df_with_schema = spark.read.options(header = True, delimiter = ',').format("csv").schema(schema).load("../resources/zipcodes.csv")
df_with_schema.printSchema()

root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Xaxis: integer (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- TaxReturnsFiled: string (nullable = true)
 |-- EstimatedPopulation: integer (nullable = true)
 |-- TotalWages: integer (nullable = true)
 |-- Notes: string (nullable = true)



## How to specify data types as you read in datasets.

Some data types make it easier to infer schema (like tabular formats such as csv which we will show later). 

However you often have to set the schema yourself if you aren't dealing with a .read method that doesn't have inferSchema() built-in.

Spark has all the tools you need for this, it just requires a very specific structure:

In [7]:
from pyspark.sql.types import StructField,StringType,IntegerType,StructType,DateType

Next we need to create the list of Structure fields * :param name: string, name of the field. * :param dataType: :class:DataType of the field. * :param nullable: boolean, whether the field can be null (None) or not.

In [8]:
data_schema = [StructField("name", StringType(), True),
               StructField("email", StringType(), True),
               StructField("city", StringType(), True),
               StructField("mac", StringType(), True),
               StructField("timestamp", DateType(), True),
               StructField("creditcard", StringType(), True)]

In [9]:
final_struc = StructType(fields=data_schema)

In [10]:
people = spark.read.json(path+'people.json', schema=final_struc)
people.printSchema()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- timestamp: date (nullable = true)
 |-- creditcard: string (nullable = true)



In [11]:
# Neat "describe" function
df.describe(['math score']).show()

+-------+------------------+
|summary|        math score|
+-------+------------------+
|  count|              1000|
|   mean|            66.089|
| stddev|15.163080096009454|
|    min|                 0|
|    max|               100|
+-------+------------------+



In [12]:
# Summary function
df.select("math score", "reading score", "writing score").summary("count", "min", "25%", "75%", "max").show()

+-------+----------+-------------+-------------+
|summary|math score|reading score|writing score|
+-------+----------+-------------+-------------+
|  count|      1000|         1000|         1000|
|    min|         0|           17|           10|
|    25%|        57|           59|           57|
|    75%|        77|           79|           79|
|    max|       100|          100|          100|
+-------+----------+-------------+-------------+



## Specify data types as you read in datasets.

Some data types make it easier to infer schema (like tabular formats such as csv which we will show later). 

However you often have to set the schema yourself if you aren't dealing with a .read method that doesn't have inferSchema() built-in.

Spark has all the tools you need for this, it just requires a very specific structure.

I've also included Spark's link to their latest list of data types for your reference in case you need it: https://spark.apache.org/docs/latest/sql-reference.html

In [13]:
people.printSchema()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- timestamp: date (nullable = true)
 |-- creditcard: string (nullable = true)



In [14]:
people.limit(4).toPandas()

Unnamed: 0,name,email,city,mac,timestamp,creditcard
0,,,,,,
1,Keeley Bosco,katlyn@jenkinsmaggio.net,Lake Gladysberg,08:fd:0b:cd:77:f7,2015-04-25,1228-1221-1221-1431
2,Rubye Jerde,juvenal@johnston.name,,90:4d:fa:42:63:a2,2015-04-25,1228-1221-1221-1431
3,Miss Darian Breitenberg,,,f9:0e:d3:40:cb:e9,2015-04-25,


In [15]:
data_schema = [StructField("name", StringType(), True)]
final_struc = StructType(fields=data_schema)
people = spark.read.json(path+'people.json', schema=final_struc)

## Writing Data

CSV

In [17]:
from pyspark.sql.types import * #StructField,StringType,IntegerType,StructType

In [18]:
# Note the funky naming convention of the file in your output folder. There is no way to directly change this. 
df.write.mode("overwrite").csv('write_test.csv')

**Parquet files**

In [None]:
df.write.mode("overwrite").parquet("parquet/")

For those who got an error attempting to run the above code. Try this solution: https://stackoverflow.com/questions/59220832/unable-to-write-spark-dataframe-to-a-parquet-file-format-to-c-drive-in-pyspark

#### Writting Partitioned Parquet Files

Best practice

In [None]:
df.write.mode("overwrite").partitionBy("race/ethnicity").parquet("partitioned_parquet_test/")

## Select Data

### **Order By**

In [20]:
df_fifa.select(['math score','reading score','writing score']).orderBy("writing score").show(5)

+----------+-------------+-------------+
|math score|reading score|writing score|
+----------+-------------+-------------+
|         0|           17|           10|
|        30|           24|           15|
|        28|           23|           19|
|        30|           26|           22|
|         8|           24|           23|
+----------+-------------+-------------+
only showing top 5 rows



### **Order By Descending**

In [21]:
df.select(['math score','reading score','writing score']).orderBy(df["writing score"].desc()).show(5)

+----------+-------------+-------------+
|math score|reading score|writing score|
+----------+-------------+-------------+
|        99|          100|          100|
|        88|           99|          100|
|       100|          100|          100|
|        96|          100|          100|
|        85|           95|          100|
+----------+-------------+-------------+
only showing top 5 rows



### **LIKE**

In [22]:
df.select('gender','parental level of education').where(df.gender.like("%fem%")).show(5, False)

+------+---------------------------+
|gender|parental level of education|
+------+---------------------------+
|female|bachelor's degree          |
|female|some college               |
|female|master's degree            |
|female|associate's degree         |
|female|some college               |
+------+---------------------------+
only showing top 5 rows



### **Substrings**

In [24]:
df.select("lunch",(df.lunch.substr(1,3)).alias('substring')).show(5,False)

+------------+---------+
|lunch       |substring|
+------------+---------+
|standard    |sta      |
|standard    |sta      |
|standard    |sta      |
|free/reduced|fre      |
|standard    |sta      |
+------------+---------+
only showing top 5 rows



### **IS IN**

In [25]:
#df[df.select['parental level of education'].isin("some collegue")].limit(4).toPandas()
# ISIN(list)
#df['Name','club','Nationality'].filter("Club IN ('FC Barcelona')").limit(4).toPandas()

**Starts** with **Ends** with

Search for a specific case - begins with "x" and ends with "x"

In [29]:
df_fifa = spark.read.csv('../Datasets/fifa19.csv',inferSchema=True,header=True)

In [31]:
#df_fifa.printSchema()

In [34]:
df_fifa.select(['Aggression', 'Stamina']).show(5)

+----------+-------+
|Aggression|Stamina|
+----------+-------+
|        48|     72|
|        63|     88|
|        56|     81|
|        38|     43|
|        76|     90|
+----------+-------+
only showing top 5 rows



In [37]:
df_fifa.select(['Aggression', 'Stamina']).summary('count','min').show()

+-------+----------+-------+
|summary|Aggression|Stamina|
+-------+----------+-------+
|  count|     18159|  18159|
|    min|        11|     12|
+-------+----------+-------+



In [46]:
df_fifa.select(["Name", "age", "Club"]).where(df_fifa.Club.like("%celon%")).show(2)

+---------+---+------------+
|     Name|age|        Club|
+---------+---+------------+
| L. Messi| 31|FC Barcelona|
|L. Suárez| 31|FC Barcelona|
+---------+---+------------+
only showing top 2 rows



In [47]:
# SELECT SUBSTRING ('what a wonderful DAY' from 2 for 6); -- hat a
df_fifa.select("Photo",df_fifa.Photo.substr(-4,5).alias('the last 4 charachter')).show(2)

+--------------------+---------------------+
|               Photo|the last 4 charachter|
+--------------------+---------------------+
|https://cdn.sofif...|                 .png|
|https://cdn.sofif...|                 .png|
+--------------------+---------------------+
only showing top 2 rows



In [51]:
df_fifa['Name','Club','Nationality'].filter("Club IN ('FC Barcelona')").limit(2).toPandas()

Unnamed: 0,Name,Club,Nationality
0,L. Messi,FC Barcelona,Argentina
1,L. Suárez,FC Barcelona,Uruguay


In [53]:
df_fifa.select('Name', 'Club').where(df_fifa.Name.startswith("L")) \
    .where(df_fifa.Name.endswith('i')).where(df_fifa.Club.like('%Barcelona')).show(5)

+--------+------------+
|    Name|        Club|
+--------+------------+
|L. Messi|FC Barcelona|
+--------+------------+



### **Slicing**

pyspark.sql.functions.slice(x, start, length)[source]
Returns an array containing all the elements in x from index start (or starting from the end if start is negative) with the specified length.

Note: indexing starts at 1 here

In [55]:
## SLICING DataFrame, take n number of "rows"
df.count()
# get slice of data
df1 = df.limit(100)
df1.show(1)

+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|   lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|standard|                   none|        72|           72|           74|
+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
only showing top 1 row



In [61]:
# SLICING, take n number of "colomns"
from pyspark.sql.functions import slice
df2 = df_fifa.select('Name', 'Club', "Nationality")
# OR
#df_sel_col = df_fifa.select(df.columns[0:5])
df2.show(1)

+--------+------------+-----------+
|    Name|        Club|Nationality|
+--------+------------+-----------+
|L. Messi|FC Barcelona|  Argentina|
+--------+------------+-----------+
only showing top 1 row



If you want to just slice your dataframe you can do this....

In [None]:
# Starting
print('Starting row cound:',df.count())
print('Starting column count:',len(df.columns))

# Slice rows
df2 = df.limit(300)
print('Sliced row count:',df2.count())

# Slice columns
cols_list = df.columns[0:5]
df3 = df.select(cols_list)
print('Sliced column count:',len(df3.columns))

## Filtering Data

A large part of working with DataFrames is the ability to quickly filter out data based on conditions. Spark DataFrames are built on top of the Spark SQL platform, which means that is you already know SQL, you can quickly and easily grab that data using SQL commands, or using the DataFram methods (which is what we focus on in this course).

In [63]:
df_fifa.filter("Overall>50").limit(1).toPandas()

21/10/18 14:19:39 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , ID, Name, Age, Photo, Nationality, Flag, Overall, Potential, Club, Club Logo, Value, Wage, Special, Preferred Foot, International Reputation, Weak Foot, Skill Moves, Work Rate, Body Type, Real Face, Position, Jersey Number, Joined, Loaned From, Contract Valid Until, Height, Weight, LS, ST, RS, LW, LF, CF, RF, RW, LAM, CAM, RAM, LM, LCM, CM, RCM, RM, LWB, LDM, CDM, RDM, RWB, LB, LCB, CB, RCB, RB, Crossing, Finishing, HeadingAccuracy, ShortPassing, Volleys, Dribbling, Curve, FKAccuracy, LongPassing, BallControl, Acceleration, SprintSpeed, Agility, Reactions, Balance, ShotPower, Jumping, Stamina, Strength, LongShots, Aggression, Interceptions, Positioning, Vision, Penalties, Composure, Marking, StandingTackle, SlidingTackle, GKDiving, GKHandling, GKKicking, GKPositioning, GKReflexes, Release Clause
 Schema: _c0, ID, Name, Age, Photo, Nationality, Flag, Overall, Potential, Club, Club Logo, Value,

Unnamed: 0,_c0,ID,Name,Age,Photo,Nationality,Flag,Overall,Potential,Club,...,Composure,Marking,StandingTackle,SlidingTackle,GKDiving,GKHandling,GKKicking,GKPositioning,GKReflexes,Release Clause
0,0,158023,L. Messi,31,https://cdn.sofifa.org/players/4/19/158023.png,Argentina,https://cdn.sofifa.org/flags/52.png,94,94,FC Barcelona,...,96,33,28,26,6,11,15,14,8,€226.5M


In [64]:
# Using SQL with .select()
df_fifa.filter("Overall>50").select(['ID','Name','Nationality','Overall']).limit(4).toPandas()

Unnamed: 0,ID,Name,Nationality,Overall
0,158023,L. Messi,Argentina,94
1,20801,Cristiano Ronaldo,Portugal,94
2,190871,Neymar Jr,Brazil,92
3,193080,De Gea,Spain,91


### Collecting Results as Python Objects

In [66]:
# Collecting results as Python objects
# you need the ".collect()" call at the end to "collect" the results
result = df_fifa.select(['Nationality','Name','Age','Overall']).filter("Overall>70").orderBy(df_fifa["Overall"].desc()).collect()

In [67]:
print("Best Player Over 70: ",result[0][1])

Best Player Over 70:  L. Messi


Rows can also be called to turn into dictionaries if needed, row as a dictionary 

In [69]:
for item in result[0]:
    print(item)

Argentina
L. Messi
31
94


# <span class="burk">SQL Options in Spark</span>

### Spark SQL

Spark TempView provides two functions that allow users to run SQL queries against a Spark DataFrame:

createOrReplaceTempView: The lifetime of this temporary view is tied to the [[SparkSession]] that was used to create this Dataset. It creates (or replaces if that view name already exists) a lazily evaluated "view" that you can then use like a hive table in Spark SQL. It does not persist to memory unless you cache the dataset that underpins the view.

createGlobalTempView: The lifetime of this temporary view is tied to this Spark application.

In [74]:
crime = spark.read.csv(path+"rec-crime-pfa.csv",header=True,inferSchema=True)

In [87]:
# This is way better
crime.limit(3).toPandas()

Unnamed: 0,12 months ending,PFA,Region,Offence,Rolling year total number of offences
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202


So, in order for us to perform SQL calls off of this dataframe, we will need to rename any variables that have spaces in them. We will not be using the first variable so I'll leave that one as is, but we will be using the last variable, so I will go ahead and change that to Count so we can work with it. 

In [76]:
df = crime.withColumnRenamed('Rolling year total number of offences','Count') #.withColumn("12 months ending", crime["12 months ending"].cast(DateType())).
print(df.printSchema())

root
 |-- 12 months ending: string (nullable = true)
 |-- PFA: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Offence: string (nullable = true)
 |-- Count: integer (nullable = true)

None


In [77]:
# Create a temporary view of the dataframe
df.createOrReplaceTempView("tempview")

In [85]:
# Then Query the temp view
spark.sql("SELECT * FROM tempview WHERE Count > 1000").limit(1).toPandas()

Unnamed: 0,12 months ending,PFA,Region,Offence,Count
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959


In [84]:
# Or choose which vars you want
spark.sql("SELECT Region, PFA FROM tempview WHERE Count > 1000").limit(1).toPandas()

Unnamed: 0,Region,PFA
0,South West,Avon and Somerset


In [83]:
# You can also pass your query results to an object 
# (we don't need to use .collect() here)
sql_results = spark.sql("SELECT * FROM tempview WHERE Count > 1000 AND Region='South West'")
sql_results.limit(1).toPandas()

Unnamed: 0,12 months ending,PFA,Region,Offence,Count
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959


In [82]:
# We can even do aggregated "group by" calls like this
spark.sql("SELECT Region, sum(Count) AS Total FROM tempview GROUP BY Region").limit(1).toPandas()

Unnamed: 0,Region,Total
0,Fraud: CIFAS,7678981


<div class="girk">
**Bonus!** <br>
Not included in the lecture, but thought some of you may enjoy this. If you want to write more freeform style SQL you can enclose your query in triple quotes like this. Here I have shown an example using CTE which a more advanced SQL procedure. A common table expression (CTE) defines a temporary result set that a user can reference possibly multiple times within the scope of a SQL statement. A CTE is used mainly in a SELECT statement. Many people find this super useful. </div><i class="fa fa-lightbulb-o "></i>

In [88]:
# We could also do more complex SQL queries like CTE (not included )
spark.sql("""WITH t AS (
    WITH tempview AS (SELECT 1)
    SELECT * FROM tempview)
SELECT * FROM t;""").toPandas()

Unnamed: 0,1
0,1


**SQL Transformer**

You also have the option to use the SQL transformer option where you can write freeform SQL scripts.

In [89]:
# First we need to import SQL transformer
from pyspark.ml.feature import SQLTransformer

In [90]:
# Then we create an SQL call 
sqlTrans = SQLTransformer(
    statement="SELECT PFA,Region,Offence FROM __THIS__") 
# And use it to transform our df object
sqlTrans.transform(df).show(3)

+-----------------+----------+--------------------+
|              PFA|    Region|             Offence|
+-----------------+----------+--------------------+
|Avon and Somerset|South West|All other theft o...|
|Avon and Somerset|South West|       Bicycle theft|
|Avon and Somerset|South West|Criminal damage a...|
+-----------------+----------+--------------------+
only showing top 3 rows



In [91]:
type(sqlTrans)

pyspark.ml.feature.SQLTransformer

In [94]:
# Note that "__THIS__" is a special word and cannot be change to __THAT__ for example
sqlTrans = SQLTransformer(
    statement="SELECT PFA,Region,Offence FROM __THIS__") 
# And use it to transform our df object
sqlTrans.transform(df).show(2)

+-----------------+----------+--------------------+
|              PFA|    Region|             Offence|
+-----------------+----------+--------------------+
|Avon and Somerset|South West|All other theft o...|
|Avon and Somerset|South West|       Bicycle theft|
+-----------------+----------+--------------------+
only showing top 2 rows



In [95]:
# Also Note that a call like this won't work...
SQLTransformer(statement="SELECT PFA,Region,Offence FROM __THIS__").show()

AttributeError: 'SQLTransformer' object has no attribute 'show'

**Now how about a group by call**

In [98]:
#Note that this call will not work on the original dataframe "crime" when the variable "Count" is a string

sqlTrans = SQLTransformer(
    statement="SELECT Offence, SUM(Count) as Total FROM __THIS__ GROUP BY Offence") 
sqlTrans.transform(df).show(2)

+--------------------+--------+
|             Offence|   Total|
+--------------------+--------+
|Public order offe...|10925676|
|       Bicycle theft| 5297006|
+--------------------+--------+
only showing top 2 rows



In [99]:
sqlTrans = SQLTransformer(
    statement="SELECT PFA,Offence FROM __THIS__ WHERE Count > 1000") 
sqlTrans.transform(df).show(2)

+-----------------+--------------------+
|              PFA|             Offence|
+-----------------+--------------------+
|Avon and Somerset|All other theft o...|
|Avon and Somerset|       Bicycle theft|
+-----------------+--------------------+
only showing top 2 rows



In [100]:
# You can also, of course, read the output into a dataframe
result = sqlTrans.transform(df)
result.show(2)

+-----------------+--------------------+
|              PFA|             Offence|
+-----------------+--------------------+
|Avon and Somerset|All other theft o...|
|Avon and Somerset|       Bicycle theft|
+-----------------+--------------------+
only showing top 2 rows



## SQL Options within regular PySpark calls

### The expr function in PySparks SQL Function Library

You can also use the expr function within the pyspark.sql.functions library coupled with either PySpark's withColumn function or the select function.

In [101]:
# First we need to read in the library
from pyspark.sql.functions import expr 

Let's add a percent column to the dataframe. To do this, first we need to get the total number of rows in the dataframe (we can't soft this unfortunatly).

In [102]:
sqlTrans = SQLTransformer(
    statement="SELECT SUM(Count) as Total FROM __THIS__") 
sqlTrans.transform(df).show(2)

+---------+
|    Total|
+---------+
|244720928|
+---------+



In [104]:
# We could add a percent column to our df 
# that shows the offence %
# with the "withColumn" command
df.withColumn("percent",expr("round((count/244720928)*100,2)")).show(4)

+----------------+-----------------+----------+--------------------+-----+-------+
|12 months ending|              PFA|    Region|             Offence|Count|percent|
+----------------+-----------------+----------+--------------------+-----+-------+
|      31/03/2003|Avon and Somerset|South West|All other theft o...|25959|   0.01|
|      31/03/2003|Avon and Somerset|South West|       Bicycle theft| 3090|    0.0|
|      31/03/2003|Avon and Somerset|South West|Criminal damage a...|26202|   0.01|
|      31/03/2003|Avon and Somerset|South West|Death or serious ...|    2|    0.0|
+----------------+-----------------+----------+--------------------+-----+-------+
only showing top 4 rows



In [105]:
# Same thing with the "select" command
df.select("*",expr("round((count/244720928)*100,2) AS percent")).show(4)

+----------------+-----------------+----------+--------------------+-----+-------+
|12 months ending|              PFA|    Region|             Offence|Count|percent|
+----------------+-----------------+----------+--------------------+-----+-------+
|      31/03/2003|Avon and Somerset|South West|All other theft o...|25959|   0.01|
|      31/03/2003|Avon and Somerset|South West|       Bicycle theft| 3090|    0.0|
|      31/03/2003|Avon and Somerset|South West|Criminal damage a...|26202|   0.01|
|      31/03/2003|Avon and Somerset|South West|Death or serious ...|    2|    0.0|
+----------------+-----------------+----------+--------------------+-----+-------+
only showing top 4 rows



### PySparks selectExpr function

Very similar idea here but slightly different syntax.

In [107]:
df.selectExpr("*","round((count/244720928)*100,2) AS percent").filter("Region ='South West'").show(4)

+----------------+-----------------+----------+--------------------+-----+-------+
|12 months ending|              PFA|    Region|             Offence|Count|percent|
+----------------+-----------------+----------+--------------------+-----+-------+
|      31/03/2003|Avon and Somerset|South West|All other theft o...|25959|   0.01|
|      31/03/2003|Avon and Somerset|South West|       Bicycle theft| 3090|    0.0|
|      31/03/2003|Avon and Somerset|South West|Criminal damage a...|26202|   0.01|
|      31/03/2003|Avon and Somerset|South West|Death or serious ...|    2|    0.0|
+----------------+-----------------+----------+--------------------+-----+-------+
only showing top 4 rows



# <span class="burk">GroupBy and Aggregate Functions</span>

Let's learn how to use GroupBy and Aggregate methods on a DataFrame. GroupBy allows you to group rows together based off some column value, for example, you could group together sales data by the day the sale occured, or group repeast customer data based off the name of the customer. Once you've performed the GroupBy operation you can use an aggregate function off that data. An aggregate function aggregates multiple rows of data into a single output, such as taking the sum of inputs, or counting the number of inputs.

Let's see some examples on an example dataset!

In [109]:
# Start by reading in a basic csv dataset
# Let Spark know about the header and infer the Schema types!

#Some csv data
airbnb = spark.read.csv('../Datasets/nyc_air_bnb.csv',inferSchema=True,header=True)

In [111]:
# Notice here that some of the columns that are obviously numeric have been incorrectly identified as "strings". Let's edit that. Otherwise we cannot aggregate any of the numeric columns.
from pyspark.sql.types import *
from pyspark.sql.functions import *

df = airbnb.withColumn("price", airbnb["price"].cast(IntegerType())) \
        .withColumn("minimum_nights", airbnb["minimum_nights"].cast(IntegerType())) \
        .withColumn("number_of_reviews", airbnb["number_of_reviews"].cast(IntegerType())) \
        .withColumn("reviews_per_month", airbnb["reviews_per_month"].cast(IntegerType())) \
        .withColumn("calculated_host_listings_count", airbnb["calculated_host_listings_count"].cast(IntegerType()))
#QA
print(df.printSchema())
df.limit(1).toPandas()


root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: integer (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)

None


Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0,6,365


# GroupBy and Aggregate Functions

Let's learn how to use GroupBy and Aggregate methods on a DataFrame. These two commands go hand in hand many times in PySpark. ACtually in order to use the GroupBy command, you have to also tell Spark what numeric aggregate you want to learn about. For example, count, average or min/max. 

GroupBy allows you to group rows together based off some column value, for example, you could group together sales data by the day the sale occured, or group repeat customer data based off the name of the customer. Once you've performed the GroupBy operation you can use an aggregate function off that data. An aggregate function aggregates multiple rows of data into a single output, such as taking the sum of inputs, or counting the number of inputs.

You can also use the aggreate function independently as well to learn about overall statistics of your dataframe too which we will see in some of our examples. 

So let's dig in!

In [112]:
# For example we may be interested to see how many listings there were per neighbourhood group. 
# Groupby Function with count (you can also use sum, min, max)
df.groupBy("neighbourhood_group").count().show(7)

+-------------------+-----+
|neighbourhood_group|count|
+-------------------+-----+
|         Douglaston|    1|
|             Queens| 5630|
|              Nadia|    1|
|            Midtown|    4|
|    Jackson Heights|    2|
|     Hell's Kitchen|    7|
|  Greenwich Village|    2|
+-------------------+-----+
only showing top 7 rows



In [114]:
# Then you can add the following aggregate functions: mean, count, min, max, sum
# Like this for example
df.groupBy("neighbourhood_group").mean("price").show(3)

+-------------------+-----------------+
|neighbourhood_group|       avg(price)|
+-------------------+-----------------+
|         Douglaston|              1.0|
|             Queens|99.57690941385435|
|              Nadia|             null|
+-------------------+-----------------+
only showing top 3 rows



In [115]:
# This is another way of doing the above but I don't recommend it
# because you can only do one var at a time
df.groupBy("neighbourhood").agg({'price':'mean'}).show(3)

+-------------+----------+
|neighbourhood|avg(price)|
+-------------+----------+
|       Corona| 59.171875|
| Richmondtown|      78.0|
| Prince's Bay|     409.5|
+-------------+----------+
only showing top 3 rows



In [116]:
# This method is way more versatile
# Allows you to call on more than one aggregate function at a time
# It's my fav for this reason!
from pyspark.sql.functions import *
df.groupBy("neighbourhood").agg(min(df.price).alias("Min Price"),max(df.price).alias("Max Price")).show(5)

+-------------+---------+---------+
|neighbourhood|Min Price|Max Price|
+-------------+---------+---------+
|       Corona|       23|      359|
| Richmondtown|       78|       78|
| Prince's Bay|       85|     1250|
|  Westerleigh|       40|      103|
|   Mill Basin|       85|      299|
+-------------+---------+---------+
only showing top 5 rows



In [117]:
# This is also a pretty neat function you can use:
summary = df.summary("count", "min", "25%", "75%", "max")
summary.toPandas()
# But be careful because it'll perform this operation on your whole df!

                                                                                

Unnamed: 0,summary,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,count,49079,49047,48894,48873,48894,48894,48894,48894,48894,48887,48891,48738,38845.0,38858,48891,48737
1,min,"12 mins Manhattan""",1 Bed Apt in Utopic Williamsburg,"Heart of Greenwich Village""","very clean studio app""",194716858,2,-73.72247,-73.71299,-73.90783,-74,0,0,-73.94134,0,0,0
2,25%,9471893.0,2.4544724E7,7797690.0,475.0,1.94716858E8,40.68771,40.68981,-73.98309,56.0,69,1,1,0.76,0,1,0
3,75%,2.9152899E7,1.74786681E8,1.07434423E8,,1.97400421E8,40.78304,40.76299,-73.93638,145.0,175,5,23,3.24,2,2,226
4,max,"獨一無二的紐約閣樓""","ﾏﾝﾊｯﾀﾝ､駅から徒歩4分でどこに行くのにも便利な場所!女性の方希望,ｷﾚｲなお部屋｡",呈刚,현선,Woodside,Woodside,West Village,Shared room,Shared room,10000,1250,629,9.66,58,365,365


In [118]:
# Eh that was ugly!
# To do a summary for specific columns first select them:
# limit_summary = df.select("price","minimum_nights","number_of_reviews","last_review","reviews_per_month","calculated_host_listings_count","availability_365").summary("count","min","max")
limit_summary = df.select("price","minimum_nights","number_of_reviews").summary("count","min","max")
limit_summary.toPandas()

Unnamed: 0,summary,price,minimum_nights,number_of_reviews
0,count,48887,48891,48738
1,min,-74,0,0
2,max,10000,1250,629


### Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).

This is great, but what if we wanted the overall summary metrics like average and counts for more than one variable and without a groupBy variable? We could do this using the pyspark.sql functions library.

In [119]:
# Aggregate!
# agg(*exprs)
# Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).
# available agg functions: min, max, count, countDistinct, approx_count_distinct
# df.agg.(covar_pop(col1, col2)) Returns a new Column for the population covariance of col1 and col2
# df.agg.(covar_samp(col1, col2)) Returns a new Column for the sample covariance of col1 and col2.
# df.agg(corr(col1, col2)) Returns a new Column for the Pearson Correlation Coefficient for col1 and col2.

from pyspark.sql.functions import *
df.agg(min(df.price).alias("Min Price"),max(df.price).alias("Max Price")).show()

+---------+---------+
|Min Price|Max Price|
+---------+---------+
|      -74|    10000|
+---------+---------+



In [120]:
# There is also this method which is pretty similar
df.select(countDistinct("neighbourhood_group").alias('CountD'),avg('price'),stddev("price")).show()



+------+------------------+------------------+
|CountD|        avg(price)|stddev_samp(price)|
+------+------------------+------------------+
|    77|152.22298361527604|238.54146688839478|
+------+------------------+------------------+



                                                                                

In [121]:
# You could also write the syntax like this....
# But keep in mind with this method that you can only do one variable at a time (bummer)
# Again I don't recommend this!
# Max sales across everything
df.agg({'number_of_reviews':'max'}).withColumnRenamed("max(number_of_reviews)", "Max Reviews").show()

+-----------+
|Max Reviews|
+-----------+
|        629|
+-----------+



### Pivot Function
Provides a two way table and must be used in conjunction with groupBy.

In [123]:
# Pivot Function
# pivot(pivot_col, values=None)
df.groupBy("room_type").pivot("neighbourhood_group", ["Queens", "Brooklyn"]).count().show(3)



+---------+------+--------+
|room_type|Queens|Brooklyn|
+---------+------+--------+
|       51|  null|    null|
|      205|  null|    null|
|       54|  null|    null|
+---------+------+--------+
only showing top 3 rows





In [124]:
# You can also filter your results if you need to
# We some invalid data in the above output
# So we could select only the "Share room" types if we wanted to
df.filter("room_type='Shared room'").groupBy("room_type") \
    .pivot("neighbourhood_group", ["Queens", "Brooklyn"]).count().show(100)

+-----------+------+--------+
|  room_type|Queens|Brooklyn|
+-----------+------+--------+
|Shared room|   198|     413|
+-----------+------+--------+



### Comine all three!
It is also possible to combine all three method into one call: GroupBy, Pivot and Agg like this:

In [125]:
# from pyspark.sql.functions import *
df.groupBy("neighbourhood").pivot("neighbourhood_group", ["Queens", "Brooklyn"]) \
    .agg(min(df.price).alias("Min Price"),max(df.price).alias("Max Price")).toPandas()#.show()
# Note The toPandas() method should only be used if the resulting Pandas’s DataFrame is expected to be small, 
# as all the data is loaded into the driver’s memory.

                                                                                

Unnamed: 0,neighbourhood,Queens_Min Price,Queens_Max Price,Brooklyn_Min Price,Brooklyn_Max Price
0,Corona,23.0,359.0,,
1,Prince's Bay,,,,
2,Richmondtown,,,,
3,Mill Basin,,,85.0,299.0
4,Westerleigh,,,,
...,...,...,...,...,...
378,40.69383,,,,
379,Morningside Heights,,,,
380,Greenpoint,,,0.0,10000.0
381,Elmhurst,15.0,443.0,,


# <span class="burk">Joining and Appending DataFrames in PySpark</span>

## Generate play data

First some play data to help us grasp some concepts. Let's create a database that has two tables. 

**Key Terms**
 - **omnivore**: an animal which is able to consume both plants (like a herbivore) and meat (like a carnivore)
 - **herbivore**: any animal that eats only vegetation (i.e. that eats no meat)
 - **carnivore**: any animal that eats meat as the main part of its diet

In [127]:
valuesP = [('koala',1,'yes'),('caterpillar',2,'yes'),('deer',3,'yes'),('human',4,'yes')]
eats_plants = spark.createDataFrame(valuesP,['name','id','eats_plants'])

valuesM = [('shark',5,'yes'),('lion',6,'yes'),('tiger',7,'yes'),('human',4,'yes')]
eats_meat = spark.createDataFrame(valuesM,['name','id','eats_meat'])

print("Plant eaters (herbivores)")
print(eats_plants.show())
print("Meat eaters (carnivores)")
print(eats_meat.show())

Plant eaters (herbivores)


[Stage 174:>                                                        (0 + 1) / 1]

+-----------+---+-----------+
|       name| id|eats_plants|
+-----------+---+-----------+
|      koala|  1|        yes|
|caterpillar|  2|        yes|
|       deer|  3|        yes|
|      human|  4|        yes|
+-----------+---+-----------+

None
Meat eaters (carnivores)
+-----+---+---------+
| name| id|eats_meat|
+-----+---+---------+
|shark|  5|      yes|
| lion|  6|      yes|
|tiger|  7|      yes|
|human|  4|      yes|
+-----+---+---------+

None


                                                                                

### Appends
Appending "appends" two dataframes together that have the exact same variables. You can think of it like stacking two or more blocks ON TOP of each other. To demonstrate this, we will simply join the same dataframe to itself since we don't really have a good use case for this. But hopefully this will help you imagine what to do.

A common usecase would be joining the same table of infomation from one year to another year (i.e. 2012 + 2013 + ...)



In [129]:
# So first replicate table and call it new_df
new_df = eats_plants
# Then append using the union function
# this naming convention can be tricky to grasp for SQL enthusiasts 
# Where union just mean join
df_concat = eats_plants.union(new_df)
# We will test to see if this worked by getting before and after row counts
print(("eats_plants df Counts:", eats_plants.count(), len(eats_plants.columns)))
print(("df_concat Counts:", df_concat.count(), len(df_concat.columns)))
print(eats_plants.show(5))
print(df_concat.show(5))

('eats_plants df Counts:', 4, 3)
('df_concat Counts:', 8, 3)
+-----------+---+-----------+
|       name| id|eats_plants|
+-----------+---+-----------+
|      koala|  1|        yes|
|caterpillar|  2|        yes|
|       deer|  3|        yes|
|      human|  4|        yes|
+-----------+---+-----------+

None
+-----------+---+-----------+
|       name| id|eats_plants|
+-----------+---+-----------+
|      koala|  1|        yes|
|caterpillar|  2|        yes|
|       deer|  3|        yes|
|      human|  4|        yes|
|      koala|  1|        yes|
+-----------+---+-----------+
only showing top 5 rows

None


## Inner Joins!

Inner joins get us ONLY the values that appear in BOTH tables we are joining. 

In [131]:
inner_join = eats_plants.join(eats_meat, ["name","id"],"inner")
print("Inner Join Example")
print(inner_join.show())
# So this is the only name that appears in BOTH dataframes

Inner Join Example
+-----+---+-----------+---------+
| name| id|eats_plants|eats_meat|
+-----+---+-----------+---------+
|human|  4|        yes|      yes|
+-----+---+-----------+---------+

None


## Left Joins

Left joins get us the values that appear in the left table and nothing additional from the right table except for its columns. A quick quality check we could do would be to make sure that the human column has the value "yes" for both eats_plants and eats_meat columns.

In [132]:
left_join = eats_plants.join(eats_meat, ["name","id"], how='left') # Could also use 'left_outer'
print("Left Join Example")
print(left_join.show())

Left Join Example
+-----------+---+-----------+---------+
|       name| id|eats_plants|eats_meat|
+-----------+---+-----------+---------+
|       deer|  3|        yes|     null|
|      human|  4|        yes|      yes|
|      koala|  1|        yes|     null|
|caterpillar|  2|        yes|     null|
+-----------+---+-----------+---------+

None


## Conditional Joins

Conditional joins have some additional logic that was not encompassed in the underlying join. For example, if we wanted to get all the values that appear in the left, **except** for those values that appear in BOTH tables, we could do this. Notice how human is left out now.

In [133]:
conditional_join = eats_plants.join(eats_meat, ["name","id"], how='left').filter(eats_meat.name.isNull())
print("Conditional Left Join")
print(conditional_join.show())

Conditional Left Join
+-----------+---+-----------+---------+
|       name| id|eats_plants|eats_meat|
+-----------+---+-----------+---------+
|       deer|  3|        yes|     null|
|      koala|  1|        yes|     null|
|caterpillar|  2|        yes|     null|
+-----------+---+-----------+---------+

None


## Right Join

A right join gets you the values that appear in the right table but not in the left. It also brings it's columns over of course. 

In [134]:
right_join = eats_plants.join(eats_meat,  ["name","id"],how='right') # Could also use 'right_outer'
print("Right Join")
print(right_join.show())

Right Join
+-----+---+-----------+---------+
| name| id|eats_plants|eats_meat|
+-----+---+-----------+---------+
|shark|  5|       null|      yes|
|human|  4|        yes|      yes|
|tiger|  7|       null|      yes|
| lion|  6|       null|      yes|
+-----+---+-----------+---------+

None


## Full Outer Joins

Full outer joins will get all values from both tables, but notice that if there is a column that is common in both tables (ie. id and name in this case) that the join will take the value of the left table (see human id is p4 and not m4).

In [135]:
full_outer_join = eats_plants.join(eats_meat, ["name","id"],how='full') # Could also use 'full_outer'
print("Full Outer Join")
print(full_outer_join.show())

Full Outer Join
+-----------+---+-----------+---------+
|       name| id|eats_plants|eats_meat|
+-----------+---+-----------+---------+
|       deer|  3|        yes|     null|
|      shark|  5|       null|      yes|
|      human|  4|        yes|      yes|
|      tiger|  7|       null|      yes|
|       lion|  6|       null|      yes|
|      koala|  1|        yes|     null|
|caterpillar|  2|        yes|     null|
+-----------+---+-----------+---------+

None


## Alright now let's try with REAL data

Thinking about how to join your data in real life will not be as easy as the above. You need to consider multiple aspects as you join tables in real life and ALWAYS conduct sanity checks to make sure you did it correctly. Let's look at an example below with real data.

#### First, let's read in the datasets we will be working with

Here is a neat function that will read in all the csv files from a directory (folder) in one shot and returns a separate dataframe for each dataset in the directory using the same naming convention. This is super useful if you have a large set of files and don't feel like writing a separate line for each dataset in the directory. 

In [136]:
import os

path = "../Datasets/uw-madison-courses/"

df_list = []
for filename in os.listdir(path):
    if filename.endswith(".csv"):
        filename_list = filename.split(".") #separate path from .csv
        df_name = filename_list[0]
        df = spark.read.csv(path+filename,inferSchema=True,header=True)
        df.name = df_name
        df_list.append(df_name)
        exec(df_name + ' = df')
        
# QA
print("Full list of dfs:")
print(df_list)

Full list of dfs:
['subjects', 'subject_memberships', 'rooms', 'schedules', 'sections', 'courses', 'course_offerings', 'instructors', 'teachings', 'grade_distributions']


## About this database

You will notice that there are several tables in the uw-madision-courses folder that were read in above. This database will let you get a chance to practice your own custom joins and learn how the relationships between a real database work. Sometimes we don't know how they are related and we need to figure it out! I'll save that for the HW since we will be using the same database :) So I just wanted to introduce the database to you quickly here first. 

For this lecture, we will focus on the 4 datasets below and save the rest for the HW. Here is a look at some of the important variables we will be using to join our tables:

 - **course_offerings:** uuid, course_uuid, term_code, name
 - **instructors:** id, name
 - **sections:** uuid, course_offering_uuid,room_uuid, schedule_uuid
 - **teachings:** instructor_id, section_uuid
 
 **Source:** https://www.kaggle.com/Madgrades/uw-madison-courses
 
Let's pretend that I am a student interested in seeing what courses are available. I suppose I would start by look at the course offerings table. 

In [137]:
# View the data
course_offerings.limit(4).toPandas()

Unnamed: 0,uuid,course_uuid,term_code,name
0,344b3ebe-da7e-314c-83ed-9425269695fd,a3e3e1c3-543d-3bb5-ae65-5f2aec4ad1de,1092,Cooperative Education Prog
1,f718e6cd-33f0-3c14-a9a6-834d9c3610a8,a3e3e1c3-543d-3bb5-ae65-5f2aec4ad1de,1082,Cooperative Education Prog
2,ea3b717c-d66b-30dc-8b37-964d9688295f,a3e3e1c3-543d-3bb5-ae65-5f2aec4ad1de,1172,Cooperative Education Prog
3,075da420-5f49-3dd0-93df-13e3c152e1b1,a3e3e1c3-543d-3bb5-ae65-5f2aec4ad1de,1114,Cooperative Education Prog


This course offers table is great, but I also want to know who teaches each course because I want to check the reviews of the instructor before I take the course. Let's see if we can join this table with the instructors table that contains the name of the instructor.

In [138]:
instructors.show(4,False)

+-------+------------------+
|id     |name              |
+-------+------------------+
|761703 |JOHN ARCHAMBAULT  |
|3677061|STEPHANIE KANN    |
|788586 |KATHY PREM        |
|1600463|KRISTIN KLARKOWSKI|
+-------+------------------+
only showing top 4 rows



Hmmm, so this table only contains 2 columns (id and name) and doesn't have the uuid or course uuid to join on. So we will need to see how we can accomplish the join we need. It looks like from the tables we have, we would need to take the following steps to get the variables we need. 

 - **course_offerings (CO):** uuid, course_uuid, term_code, name
 - **instructors (I):** id, name
 - **sections (S):** uuid, course_offering_uuid,room_uuid, schedule_uuid
 - **teachings (T):** instructor_id, section_uuid
 
 I.id --> T.instructor_id
                \/
          T.section_uuid --> S.uuid
                              \/
                             S.course_offering_uuid --> CO.uuid

In [139]:
teachings.show(3)

+-------------+--------------------+
|instructor_id|        section_uuid|
+-------------+--------------------+
|       761703|45adf63c-48c9-365...|
|       761703|c6280e23-5e43-385...|
|       761703|9395dc21-15d1-3fa...|
+-------------+--------------------+
only showing top 3 rows



In [140]:
# Let's try to see all course offerings and who teaches it
# Notice here that the variable we want to join on is different in the two datasets. 
# PySpark makes it easy to account for that
step1 = teachings.join(instructors, teachings.instructor_id == instructors.id, how='left').select(['instructor_id','name','section_uuid'])
step1.limit(4).toPandas()

Unnamed: 0,instructor_id,name,section_uuid
0,761703,JOHN ARCHAMBAULT,45adf63c-48c9-3659-8561-07556d2d4ddf
1,761703,JOHN ARCHAMBAULT,c6280e23-5e43-3859-893e-540d94993529
2,761703,JOHN ARCHAMBAULT,9395dc21-15d1-3fab-8d1f-6f3fe6114c48
3,3677061,STEPHANIE KANN,b99e440b-39db-350a-81eb-b6eb1bd8b0bc


In [141]:
step2 = step1.join(sections, step1.section_uuid == sections.uuid, how='left').select(['name','course_offering_uuid'])
step2.limit(4).toPandas()

                                                                                

Unnamed: 0,name,course_offering_uuid
0,THOMAS JAHNS,f850ab24-740c-311a-a669-804a3fea7b0b
1,JEAN-FRANCOIS HOUDE,7e213b2b-c58b-3014-b3d1-01c0f7ed46ef
2,CHRISTOPHER R TABER,3beb7bd7-4877-3c63-8afc-62f8b74e72fc
3,MARISA S OTEGUI,db253216-2e66-3267-86b2-7b9f5fe07223


In [142]:
step3 = step2.withColumnRenamed('name', 'instructor').join(course_offerings, step2.course_offering_uuid == course_offerings.uuid, how='inner').select(['instructor','name','course_offering_uuid'])
step3.limit(4).toPandas()

                                                                                

Unnamed: 0,instructor,name,course_offering_uuid
0,THOMAS JAHNS,Master's Research or Thesis,f850ab24-740c-311a-a669-804a3fea7b0b
1,JEAN-FRANCOIS HOUDE,Wrkshp-Industrl Organizatn,7e213b2b-c58b-3014-b3d1-01c0f7ed46ef
2,CHRISTOPHER R TABER,Workshop - Public Economics,3beb7bd7-4877-3c63-8afc-62f8b74e72fc
3,MARISA S OTEGUI,Plant Cell Biology,db253216-2e66-3267-86b2-7b9f5fe07223


And that's it! Sometimes it's helpful to think through joins step by step like this. I hope that helped get the concept down. 

## One final really cool way to join datasets: The Levenshtien distance!

Which basically counts the number of edits you would need to make to make too strings equal to eachother. I'll let you figure the joining part in the HW!

In [143]:
# Compute the levenshtein distance beween two strings
# pyspark.sql.functions.levenshtein(left, right)  

from pyspark.sql.functions import levenshtein

df0 = spark.createDataFrame([('Aple', 'Apple','Microsoft','IBM')], ['Input', 'Option1','Option2','Option3'])
print("Correct this company name: Aple")
df0.select(levenshtein('Input', 'Option1').alias('Apple')).show()
df0.select(levenshtein('Input', 'Option2').alias('Microsoft')).show()
df0.select(levenshtein('Input', 'Option3').alias('IBM')).show()

Correct this company name: Aple
+-----+
|Apple|
+-----+
|    1|
+-----+

+---------+
|Microsoft|
+---------+
|        9|
+---------+

+---+
|IBM|
+---+
|  4|
+---+



In [None]:
## Joins, all options 

inner_join = TableA.join(TableB, ["name","id"],"inner")
print("Inner Join Example")
print(inner_join.show())

left_join = TableA.join(TableB, ["name","id"], how='left') # Could also use 'left_outer'
print("Left Join Example")
print(left_join.show())

conditional_join = TableA.join(TableB, ["name","id"], how='left').filter(TableB.name.isNull())
print("Conditional Left Join")
print(conditional_join.show())

right_join = TableA.join(TableB,  ["name","id"],how='right') # Could also use 'right_outer'
print("Right Join")
print(right_join.show())

full_outer_join = TableA.join(TableB, ["name","id"],how='full') # Could also use 'full_outer'
print("Full Outer Join")
print(full_outer_join.show())

# <span class="burk">Handling missing data</span> 

In [145]:
#Some csv data
zomato = spark.read.csv('../Datasets/zomato.csv',inferSchema=True,header=True)
df = zomato.withColumn("approx_cost(for two people)", zomato["approx_cost(for two people)"].cast(IntegerType())) \
    .withColumn("votes", zomato["votes"].cast(IntegerType()))

                                                                                

In [147]:
df.filter(df.cuisines.isNull()).select(['name','cuisines']).show(10)

+--------------------+--------+
|                name|cuisines|
+--------------------+--------+
|               Jalsa|    null|
|       Grand Village|    null|
|       Casual Dining|    null|
|     Timepass Dinner|    null|
|       Casual Dining|    null|
|Rosewood Internat...|    null|
|       Casual Dining|    null|
|              Onesta|    null|
|      Penthouse Cafe|    null|
|           Smacznego|    null|
+--------------------+--------+
only showing top 10 rows



**Missing Data Statistics**

In [149]:
from pyspark.sql.functions import isnan, when, count, col

nulls = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])
percent = df.select([format_number(((count(when(isnan(c) | col(c).isNull(), c))/df.count())*100),1).alias(c) for c in df.columns])

result = nulls.union(percent)

result.toPandas()

                                                                                

Unnamed: 0,url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,0.0,0.0,85.0,8111.0,2.0,7775.0,20018.0,1227.0,20054.0,20165.0,46841.0,27305.0,43611.0,28185.0,28611.0,28983.0,29344.0
1,0.0,0.0,0.1,11.3,0.0,10.8,27.9,1.7,28.0,28.1,65.3,38.1,60.8,39.3,39.9,40.4,40.9


In [150]:
from pyspark.sql.functions import *
def null_value_count(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_count_list = null_value_count(df)
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

                                                                                

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                  name|               85|
|          online_order|             8111|
|            book_table|                2|
|                  rate|             7775|
|                 votes|            20018|
|                 phone|             1227|
|              location|            20054|
|             rest_type|            20165|
|            dish_liked|            46841|
|              cuisines|            27305|
|  approx_cost(for t...|            43611|
|          reviews_list|            28185|
|             menu_item|            28611|
|       listed_in(type)|            28983|
|       listed_in(city)|            29344|
+----------------------+-----------------+



**Drop all missing data**

PySpark has a really handy .na function for working with missing data. The drop command has the following parameters:

    df.na.drop(how='any', thresh=None, subset=None)

In [152]:
df.na.drop().limit(2).toPandas() 

Unnamed: 0,url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,https://www.zomato.com/bangalore/spice-elephan...,"2nd Floor, 80 Feet Road, Near Big Bazaar, 6th ...",Spice Elephant,Yes,No,4.1/5,787,080 41714161,Banashankari,Casual Dining,"Momos, Lunch Buffet, Chocolate Nirvana, Thai G...","Chinese, North Indian, Thai",800,"""[('Rated 4.0', 'RATED\n Had been here for di...",rice was well cooked and overall was great\n\n...,('Rated 5.0','RATED\n This place just cool ? with good am...
1,https://www.zomato.com/SanchurroBangalore?cont...,"1112, Next to KIMS Medical College, 17th Cross...",San Churro Cafe,Yes,No,3.8/5,918,+91 9663487993,Banashankari,"Cafe, Casual Dining","Churros, Cannelloni, Minestrone Soup, Hot Choc...","Cafe, Mexican, Italian",800,"""[('Rated 3.0', """"RATED\n Ambience is not tha...",('Rated 3.0',"""""RATED\n \nWent there for a quick bite with ...",pasta churros and lasagne.\n\nNachos were pat...


In [153]:
# Of course you will want to know how many rows that affected before you actually execute it..
og_len = df.count()
drop_len = df.na.drop().count()
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

[Stage 391:>                                                        (0 + 8) / 8]

Total Rows Dropped: 63124
Percentage of Rows Dropped 0.8800223058692318




In [154]:
# Drop rows that have at least 8 NON-null values
og_len = df.count()
drop_len = df.na.drop(thresh=8).count()
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows Dropped: 1694
Percentage of Rows Dropped 0.023616339049212325


[Stage 395:>                                                        (0 + 8) / 8]                                                                                

In [155]:
# Only drop the rows whose values in the sales column are null
og_len = df.count()
drop_len = df.na.drop(subset=["rate"]).count() 
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows Dropped: 7775
Percentage of Rows Dropped 0.10839258329848041




In [156]:
# Another way to do the above
og_len = df.count()
drop_len = df.filter(zomato.rate.isNotNull()).count() 
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows Dropped: 7775
Percentage of Rows Dropped 0.10839258329848041


In [157]:
# Drop a row only if ALL its values are null.
og_len = df.count()
drop_len = df.na.drop(how='all').count() 
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows Dropped: 0
Percentage of Rows Dropped 0.0


[Stage 407:>                                                        (0 + 8) / 8]                                                                                

### Fill the missing values

We can also fill the missing values with new values. If you have multiple nulls across multiple data types, Spark is actually smart enough to match up the data types. For example:

In [159]:
# Fill all nulls values with one common value (character value)
df.na.fill('MISSING').limit(2).toPandas()

Unnamed: 0,url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,https://www.zomato.com/bangalore/jalsa-banasha...,"942, 21st Main Road, 2nd Stage, Banashankari, ...",Jalsa,Yes,Yes,4.1/5,775.0,080 42297555,MISSING,MISSING,MISSING,MISSING,,MISSING,MISSING,MISSING,MISSING
1,"+91 9743772233""",Banashankari,Casual Dining,"Pasta, Lunch Buffet, Masala Papad, Paneer Laja...","North Indian, Mughlai, Chinese",800,,('Rated 4.0','RATED\n You canÃ\x83Ã\x83Ã\x82Ã\x82Ã\x...,('Rated 5.0','RATED\n Overdelighted by the service and fo...,('Rated 4.0',,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...


In [161]:
# Fill all nulls values with one common value (numeric value)
df.na.fill(999).limit(2).toPandas()

Unnamed: 0,url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,https://www.zomato.com/bangalore/jalsa-banasha...,"942, 21st Main Road, 2nd Stage, Banashankari, ...",Jalsa,Yes,Yes,4.1/5,775,080 42297555,,,,,999,,,,
1,"+91 9743772233""",Banashankari,Casual Dining,"Pasta, Lunch Buffet, Masala Papad, Paneer Laja...","North Indian, Mughlai, Chinese",800,999,('Rated 4.0','RATED\n You canÃ\x83Ã\x83Ã\x82Ã\x82Ã\x...,('Rated 5.0','RATED\n Overdelighted by the service and fo...,('Rated 4.0',999,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...


Usually you should specify what columns you want to fill with the subset parameter

In [164]:
df.filter(df.name.isNull()).na.fill('No Name',subset=['name']).limit(1).toPandas()

Unnamed: 0,url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,"+91 9986692090""",BTM,No Name,"Momos, Oreo Shake","Mughlai, North Indian, Chinese, Momos",600,,('Rated 3.0','RATED\n Simple food with great north indian...,"['Fry Chicken Kabab [5 Pieces]', 'Fry Chicken ...",Delivery,Bannerghatta Road,,,,,


A very common practice is to fill values with the mean value for the column. Here is a fun function to that in an automatted way.

In [166]:
def fill_with_mean(df, include=set()): 
    stats = df.agg(*(
        avg(c).alias(c) for c in df.columns if c in include
    ))
#     stats = stats.select(*(col(c).cast("int").alias(c) for c in stats.columns)) #IntegerType()
    return df.na.fill(stats.first().asDict())

updated_df = fill_with_mean(df, ["approx_cost(for two people)","votes"])
updated_df.limit(2).toPandas()

Unnamed: 0,url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,https://www.zomato.com/bangalore/jalsa-banasha...,"942, 21st Main Road, 2nd Stage, Banashankari, ...",Jalsa,Yes,Yes,4.1/5,775,080 42297555,,,,,387,,,,
1,"+91 9743772233""",Banashankari,Casual Dining,"Pasta, Lunch Buffet, Masala Papad, Paneer Laja...","North Indian, Mughlai, Chinese",800,283,('Rated 4.0','RATED\n You canÃ\x83Ã\x83Ã\x82Ã\x82Ã\x...,('Rated 5.0','RATED\n Overdelighted by the service and fo...,('Rated 4.0',387,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...


# <span class="burk">Manipulating Data in DataFrames</span>

Change data types

### Available types:
    - DataType
    - NullType
    - StringType
    - BinaryType
    - BooleanType
    - DateType
    - TimestampType
    - DecimalType
    - DoubleType
    - FloatType
    - ByteType
    - IntegerType
    - LongType
    - ShortType
    - ArrayType
    - MapType
    - StructField
    - StructType

**Spark Immutabale**

Before we get started, let's first take a moment to discuss the concept of Sparks Immutability. Spark DataFrames are immutable. What does that mean? Let's take a look at an example.

In [167]:
names = spark.createDataFrame([('Abraham','Lincoln')], ['first_name', 'last_name'])
print(names.show())
print(names.rdd.id())

+----------+---------+
|first_name|last_name|
+----------+---------+
|   Abraham|  Lincoln|
+----------+---------+

None
1127


Note the dataframe id

Now add a column to the dataframe and keep calling it the same name.

In [168]:
# add a col
from pyspark.sql.functions import *
names = names.select(names.first_name,names.last_name,concat_ws(' ', names.first_name, names.last_name).alias('full_name'))

And see how the id of the dataframe changes but the name of the dataframe is still the same.


In [170]:
print(names.show())
print(names.rdd.id())

+----------+---------+---------------+
|first_name|last_name|      full_name|
+----------+---------+---------------+
|   Abraham|  Lincoln|Abraham Lincoln|
+----------+---------+---------------+

None
1133


**Read in our Data Science Jobs DataFrame**

In [5]:
path='../Datasets/'
videos = spark.read.csv(path+'youtubevideos.csv',inferSchema=True,header=True)



In [18]:
# Notice all vars are stings above....
# let's change that
from pyspark.sql.functions import * 
from pyspark.sql.types import * # IntegerType

df = videos.withColumn("views", videos["views"].cast(IntegerType())) \
        .withColumn("likes", videos["likes"].cast(IntegerType())) \
        .withColumn("dislikes", videos["dislikes"].cast(IntegerType())) \
        .withColumn("trending_date", to_date(videos.trending_date, 'dd.mm.yy')) \
#         .withColumn("publish_time", to_timestamp(videos.publish_time, 'yyyy-MM-dd HH:mm:ss:ms'))
print(df.printSchema())
df.limit(1).toPandas()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: date (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)

None


Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,2011-01-17,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...


**Regex**

Regex is used to replace or extract all substrings of the specified string value that match regexp with rep.
regexp_replace(str, pattern, replacement)
for more info on regex calls visit: https://docs.oracle.com/cd/B19306_01/server.102/b14200/ap_posix001.htm#BABJDBHB

**Renaming Columns**

If you simply needed to rename a column you use also use this method.

In [7]:
# Simple Rename
renamed = df.withColumnRenamed('channel_title','channel_title_new')
renamed.limit(2).toPandas()

Unnamed: 0,video_id,trending_date,title,channel_title_new,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,2011-01-17,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,2011-01-17,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."


**Clean Data**

Alright so we see that the publish_time variable could not be converted to a timestamp becuase it has those strange "T" and "Z" values between the date and the time. We essentially need to replace the "T" value with a space, and the Z value with nothing. There are a couple of ways we can do this, the first is regex which is short for regular expressions. 


In [8]:
from pyspark.sql.functions import regexp_replace, regexp_extract
import pyspark.sql.functions as f

df = df.withColumn('publish_time_2',regexp_replace(df.publish_time, 'T', ' '))
df = df.withColumn('publish_time_2',regexp_replace(df.publish_time_2, 'Z', ''))
df = df.withColumn("publish_time_3", to_timestamp(df.publish_time_2, 'yyyy-MM-dd HH:mm:ss.SSS'))
#print(df.printSchema())
df.select("publish_time", "publish_time_2","publish_time_3").show(5,False)
# Notice the .000 on the end of publish_time_new as opposed to publish_time_new_t

+------------------------+-----------------------+-------------------+
|publish_time            |publish_time_2         |publish_time_3     |
+------------------------+-----------------------+-------------------+
|2017-11-13T17:13:01.000Z|2017-11-13 17:13:01.000|2017-11-13 17:13:01|
|2017-11-13T07:30:00.000Z|2017-11-13 07:30:00.000|2017-11-13 07:30:00|
|2017-11-12T19:05:24.000Z|2017-11-12 19:05:24.000|2017-11-12 19:05:24|
|2017-11-13T11:00:04.000Z|2017-11-13 11:00:04.000|2017-11-13 11:00:04|
|2017-11-12T18:01:41.000Z|2017-11-12 18:01:41.000|2017-11-12 18:01:41|
+------------------------+-----------------------+-------------------+
only showing top 5 rows



**Translate Function**

You could also use the Translate function here to do this, where the first set of values is what you are looking for and the second set is what you want to replace those values with respectively. 

In [9]:
# You can also use the translate function for cases like this 
# where you wanted to replace ('$', '#', ',') with ('X', 'Y', 'Z')
import pyspark.sql.functions as f
foobar = spark.createDataFrame([("$100,00",),("#foobar",),("foo, bar, #, and $",)], ["A"])
foobar.select("A", f.translate(f.col("A"), "$#,", "XYZ").alias("replaced")).show()

+------------------+------------------+
|                 A|          replaced|
+------------------+------------------+
|           $100,00|           X100Z00|
|           #foobar|           Yfoobar|
|foo, bar, #, and $|fooZ barZ YZ and X|
+------------------+------------------+



[Stage 5:>                                                          (0 + 1) / 1]                                                                                

**Trim**

In [10]:
# Trim
# pyspark.sql.functions.trim(col) - Trim the spaces from both ends for the specified string column.
from pyspark.sql.functions import *

trim_ex = spark.createDataFrame([(' 2015-04-08 ',' 2015-05-10 ')], ['d1', 'd2']) # create a dataframe - notice the extra whitespaces in the date strings
trim_ex.show()
print("left trim")
trim_ex.select('d1', ltrim(trim_ex.d1)).show()
print("right trim")
trim_ex.select('d1', rtrim(trim_ex.d1)).show()
print("trim")
trim_ex.select('d1', trim(trim_ex.d1)).show()

+------------+------------+
|          d1|          d2|
+------------+------------+
| 2015-04-08 | 2015-05-10 |
+------------+------------+

left trim
+------------+-----------+
|          d1|  ltrim(d1)|
+------------+-----------+
| 2015-04-08 |2015-04-08 |
+------------+-----------+

right trim
+------------+-----------+
|          d1|  rtrim(d1)|
+------------+-----------+
| 2015-04-08 | 2015-04-08|
+------------+-----------+

trim
+------------+----------+
|          d1|  trim(d1)|
+------------+----------+
| 2015-04-08 |2015-04-08|
+------------+----------+



**Case When**

In [11]:
df = spark.createDataFrame([(1,1),(2,2),(3,3)],['id','value'])

print("Sample Dataframe:")
df.show()

print("Option#1: withColumn() using when-otherwise")
from pyspark.sql.functions import when
df.withColumn("value_desc",when(df.value == 1, 'one').when(df.value == 2, 'two').otherwise('other')).show()

print("Option2: withColumn() using expr function")
from pyspark.sql.functions import expr 
df.withColumn("value_desc",expr("CASE WHEN value == 1 THEN  'one' WHEN value == 2 THEN  'two' ELSE 'other' END AS value_desc")).show()

print("Option 3: selectExpr() using SQL equivalent CASE expression")
df.selectExpr("*","CASE WHEN value == 1 THEN  'one' WHEN value == 2 THEN  'two' ELSE 'other' END AS value_desc").show()

print("Option 4: select() using expr function")
from pyspark.sql.functions import expr 
df.select("*",expr("CASE WHEN value == 1 THEN  'one' WHEN value == 2 THEN  'two' ELSE 'other' END AS value_desc")).show()

Sample Dataframe:
+---+-----+
| id|value|
+---+-----+
|  1|    1|
|  2|    2|
|  3|    3|
+---+-----+

Option#1: withColumn() using when-otherwise
+---+-----+----------+
| id|value|value_desc|
+---+-----+----------+
|  1|    1|       one|
|  2|    2|       two|
|  3|    3|     other|
+---+-----+----------+

Option2: withColumn() using expr function
+---+-----+----------+
| id|value|value_desc|
+---+-----+----------+
|  1|    1|       one|
|  2|    2|       two|
|  3|    3|     other|
+---+-----+----------+

Option 3: selectExpr() using SQL equivalent CASE expression
+---+-----+----------+
| id|value|value_desc|
+---+-----+----------+
|  1|    1|       one|
|  2|    2|       two|
|  3|    3|     other|
+---+-----+----------+

Option 4: select() using expr function
+---+-----+----------+
| id|value|value_desc|
+---+-----+----------+
|  1|    1|       one|
|  2|    2|       two|
|  3|    3|     other|
+---+-----+----------+



**Lower**

Another common data cleaning technique is lower casing all values in a string. Here's how we could do that..

In [19]:
df = df.withColumn('title',lower(df.title)) # or rtrim/ltrim
df.select("title").show(5,False)

+--------------------------------------------------------------+
|title                                                         |
+--------------------------------------------------------------+
|we want to talk about our marriage                            |
|the trump presidency: last week tonight with john oliver (hbo)|
|racist superman | rudy mancuso, king bach & lele pons         |
|nickelback lyrics: real or fake?                              |
|i dare you: going bald!?                                      |
+--------------------------------------------------------------+
only showing top 5 rows



**Concatenate**

If you want to combine two variables together (given a separator) you can use the concatenate method. Let's say we wanted to combined all the text description variables of the videos here for a robust NLP exercise of some sort and we needed to have all the text in one colum to do that like this.

    concat_ws(sep, *cols)

In [21]:
from pyspark.sql.functions import concat_ws
df.select(concat_ws(' ', df.title,df.channel_title,df.tags).alias('text')).show(1,False)

+---------------------------------------------------------------+
|text                                                           |
+---------------------------------------------------------------+
|we want to talk about our marriage CaseyNeistat SHANtell martin|
+---------------------------------------------------------------+
only showing top 1 row



**Extracting data from Date and Timestamp variables**

If you have the need to extract say the year or month from a date field, you can use PySpark's SQL function library like this. 

Note with this analysis we stumbled apon a date conversion descrepancy here. I'll leave fixing that for a hw problem!

In [22]:
from pyspark.sql.functions import year, month
# Other options: dayofmonth, dayofweek, dayofyear, weekofyear
df.select("trending_date",year("trending_date"),month("trending_date")).show(5)

+-------------+-------------------+--------------------+
|trending_date|year(trending_date)|month(trending_date)|
+-------------+-------------------+--------------------+
|   2011-01-17|               2011|                   1|
|   2011-01-17|               2011|                   1|
|   2011-01-17|               2011|                   1|
|   2011-01-17|               2011|                   1|
|   2011-01-17|               2011|                   1|
+-------------+-------------------+--------------------+
only showing top 5 rows



**Calculate the Difference between two dates**

If you want to calculate the time difference between two dates, you could use PySparks datediff function which returns the number of days from start to end.

    datediff(end, start)

In [29]:
# Calculate the difference between two dates:
# pyspark.sql.functions.datediff(end, start)
# Returns the number of days from start to end.

date_df = spark.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2'])
date_df.select(datediff(date_df.d2, date_df.d1).alias('difference ')).show()

+--------------+
|difference is:|
+--------------+
|            32|
+--------------+



In [None]:
from pyspark.sql.functions import datediff
df.select("trending_date","publish_time_3",(datediff(df.trending_date,df.publish_time_3)/365).alias('diff')).show(5)

**Split a string around a pattern**

If you ever need to split a string on a pattern (where the pattern is a regex), you could use PySparks split function. You could actually use this for tokenizing text which is an NLP function that we'll get into later.

    df.select(split(str, pattern))
    
*Note that this will create an array*

In [27]:
# Split a string around pattern (pattern is a regular expression).
from pyspark.sql.functions import *
# pyspark.sql.functions.split(str, pattern)[source]

abc = spark.createDataFrame([('ab12cd',)], ['s',])
abc.select(abc.s,split(abc.s, '[0-9]+').alias('news')).show()

+------+--------+
|     s|    news|
+------+--------+
|ab12cd|[ab, cd]|
+------+--------+



In [24]:
# Split a string around pattern (pattern is a regular expression).
from pyspark.sql.functions import split

df.select("title").show(1,False)
df.select(split(df.title, ' ').alias('new')).show(1,False)

+----------------------------------+
|title                             |
+----------------------------------+
|we want to talk about our marriage|
+----------------------------------+
only showing top 1 row

+------------------------------------------+
|new                                       |
+------------------------------------------+
|[we, want, to, talk, about, our, marriage]|
+------------------------------------------+
only showing top 1 row



**Working with Arrays**

    df.select(array_contains(df.variable, "marriage"))

*note this is only available in pyspark 2.4+* 
    

 - .array(*cols)   -   Creates a new array column.
 - .array_contains(col, value)  - Collection function: returns null if the array is null, true if the array contains the given value, and false otherwise.
 - .array_distinct(col) - Collection function: removes duplicate values from the array. :param col: name of column or expression
 - .array_except(col1, col2) - Collection function: returns an array of the elements in col1 but not in col2, without duplicates.
 - .array_intersect(col1, col2) - Collection function: returns an array of the elements in the intersection of col1 and col2, without duplicates.
 - .array_join(col, delimiter, null_replacement=None) - Concatenates the elements of column using the delimiter. Null values are replaced with null_replacement if set, otherwise they are ignored.
 - .array_max(col) - Collection function: returns the maximum value of the array.
 - .array_min(col) - Collection function: returns the minimum value of the array.
 - .array_position(col, value) - Collection function: Locates the position of the first occurrence of the given value in the given array. Returns null if either of the arguments are null.
 - .array_remove(col, element)- Collection function: Remove all elements that equal to element from the given array.
 - .array_repeat(col, count) - Collection function: creates an array containing a column repeated count times.
 - .array_sort(col) - Collection function: sorts the input array in ascending order. The elements of the input array must be orderable. Null elements will be placed at the end of the returned array.
 - .array_union(col1, col2) - Collection function: returns an array of the elements in the union of col1 and col2, without duplicates.
 - .arrays_overlap(a1, a2) - Collection function: returns true if the arrays contain any common non-null element; if not, returns null if both the arrays are non-empty and any of them contains a null element; returns false otherwise.
 - .arrays_zip(*cols)[source] - Collection function: Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.

In [30]:

customer = spark.createDataFrame([('coffee','milk','coffee','coffee','chocolate','')], ['item1', 'item2','item3','item4','item5','item6'])
purchases = customer.select(array('item1', 'item2','item3').alias("Monday"),\
                            array('item4', 'item5','item6').alias("Tuesday"))

print("array")
purchases.show()

print("Which customers purchased milk? array_contains")
purchases.select(array_contains(purchases.Monday, "milk")).show(1, False)

print("List of unique products purchased on Monday: array_distinct")
purchases.select(array_distinct(purchases.Monday)).show(1, False)

print("What did our customers order on Monday but not Tuesday? array_except")
purchases.select(array_except(purchases.Monday, purchases.Tuesday)).show(1, False)

print("What did our customers order on BOTH Monday and Tuesday?: array_intersect")
purchases.select(array_intersect(purchases.Monday, purchases.Tuesday)).show(1, False)

print("All purchases on monday in a string: array_join")
purchases.select(array_join(purchases.Monday, ',')).show(1, False)

array
+--------------------+--------------------+
|              Monday|             Tuesday|
+--------------------+--------------------+
|[coffee, milk, co...|[coffee, chocolat...|
+--------------------+--------------------+

Which customers purchased milk? array_contains
+----------------------------+
|array_contains(Monday, milk)|
+----------------------------+
|true                        |
+----------------------------+

List of unique products purchased on Monday: array_distinct
+----------------------+
|array_distinct(Monday)|
+----------------------+
|[coffee, milk]        |
+----------------------+

What did our customers order on Monday but not Tuesday? array_except
+-----------------------------+
|array_except(Monday, Tuesday)|
+-----------------------------+
|[milk]                       |
+-----------------------------+

What did our customers order on BOTH Monday and Tuesday?: array_intersect
+--------------------------------+
|array_intersect(Monday, Tuesday)|
+---------

In [26]:
from pyspark.sql.functions import *

array_df = df.select("title",split(df.title, ' ').alias('title_array'))

array_df.select("title",array_contains(array_df.title_array, "marriage")).show(1, False)

#get rid of repeat values
array_df.select(array_distinct(array_df.title_array)).show(1, False)

# Remove certian values
array_df.select(array_remove(array_df.title_array, "we")).show(1, False)

+----------------------------------+-------------------------------------+
|title                             |array_contains(title_array, marriage)|
+----------------------------------+-------------------------------------+
|we want to talk about our marriage|true                                 |
+----------------------------------+-------------------------------------+
only showing top 1 row

+------------------------------------------+
|array_distinct(title_array)               |
+------------------------------------------+
|[we, want, to, talk, about, our, marriage]|
+------------------------------------------+
only showing top 1 row

+--------------------------------------+
|array_remove(title_array, we)         |
+--------------------------------------+
|[want, to, talk, about, our, marriage]|
+--------------------------------------+
only showing top 1 row



## Creating Functions

Functions as you know them in Python work a bit differently in Pyspark because it operates on a cluster. If you define a function the traditional Python way in PySpark, you will not recieve an error message but the call will not distribute on all nodes. So it will run slower. 

So to convert a Python function to what's called a user defined function (UDF) in PySpark. This is what you do.

*Note: keep in mind that a function will not work on a column with null values

In [32]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def square(x):
    return int(x**2)
square_udf = udf(lambda z: square(z), IntegerType())

df.select('dislikes',square_udf('dislikes').alias('likes_sq')).where(col('dislikes').isNotNull()).show()

+--------+--------+
|dislikes|likes_sq|
+--------+--------+
|    2966| 8797156|
|    6146|37773316|
|    5339|28504921|
|     666|  443556|
|    1989| 3956121|
|     511|  261121|
|    2445| 5978025|
|     778|  605284|
|     119|   14161|
|    1363| 1857769|
|      25|     625|
|     303|   91809|
|    1333| 1776889|
|    1171| 1371241|
|     246|   60516|
|      52|    2704|
|     638|  407044|
|      53|    2809|
|      36|    1296|
|     191|   36481|
+--------+--------+
only showing top 20 rows



In [33]:
from pyspark.sql.functions import *
values = [(45,'I like to ride bikes'), \
          (14,'I like chicken'), \
          (63,'I like bubbles'), \
          (75,'I like roller coasters'), \
          (24,'I like shuffle board'), \
          (45,'I like to swim')]
sentences = spark.createDataFrame(values,['age', 'sentence'])
df = sentences.withColumn("array", split(col("sentence"), " "))
df.show(1,False)

+---+--------------------+--------------------------+
|age|sentence            |array                     |
+---+--------------------+--------------------------+
|45 |I like to ride bikes|[I, like, to, ride, bikes]|
+---+--------------------+--------------------------+
only showing top 1 row

