# 1. Reading, Writing and Validating Data in PySpark HW Solutions



In [1]:
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("ReadWriteVal").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## Next let's start by reading a basic csv dataset

In [9]:
path ="Datasets/"

# Some csv data
pga = spark.read.csv(path+'pga_tour_historical.csv',inferSchema=True,header=True)

## 1. View first 5 lines of dataframe
First generate a view of the first 5 lines of the dataframe to get an idea of what is inside. We went over two ways of doing this... see if you can remember BOTH ways. 

In [3]:
pga.show(3)

+---------------+------+----------------+--------------------+-----+
|    Player Name|Season|       Statistic|            Variable|Value|
+---------------+------+----------------+--------------------+-----+
|Robert Garrigus|  2010|Driving Distance|Driving Distance ...|   71|
|   Bubba Watson|  2010|Driving Distance|Driving Distance ...|   77|
| Dustin Johnson|  2010|Driving Distance|Driving Distance ...|   83|
+---------------+------+----------------+--------------------+-----+
only showing top 3 rows



In [4]:
# I prefer this method
pga.limit(5).toPandas()

Unnamed: 0,Player Name,Season,Statistic,Variable,Value
0,Robert Garrigus,2010,Driving Distance,Driving Distance - (ROUNDS),71
1,Bubba Watson,2010,Driving Distance,Driving Distance - (ROUNDS),77
2,Dustin Johnson,2010,Driving Distance,Driving Distance - (ROUNDS),83
3,Brett Wetterich,2010,Driving Distance,Driving Distance - (ROUNDS),54
4,J.B. Holmes,2010,Driving Distance,Driving Distance - (ROUNDS),100


## 2. Print the schema details

Now print the details of the dataframes schema that Spark infered to ensure that it was infered correctly. Sometimes it is not infered correctly, so we need to watch out!

In [5]:
print(pga.printSchema())
print("")
print(pga.columns)
print("")
# Not so fond of this method, but to each their own
print(pga.describe())

root
 |-- Player Name: string (nullable = true)
 |-- Season: integer (nullable = true)
 |-- Statistic: string (nullable = true)
 |-- Variable: string (nullable = true)
 |-- Value: string (nullable = true)

None

['Player Name', 'Season', 'Statistic', 'Variable', 'Value']

DataFrame[summary: string, Player Name: string, Season: string, Statistic: string, Variable: string, Value: string]


## 3. Edit the schema during the read in


In [4]:
from pyspark.sql.types import StructField,StringType,IntegerType,StructType

In [5]:
data_schema = [StructField("Player Name", StringType(), True), \
               StructField("Season", IntegerType(), True), \
               StructField("Statistic", StringType(), True), \
               StructField("Variable", StringType(), True), \
               StructField("Value", IntegerType(), True)]

In [10]:
final_struc = StructType(fields=data_schema)

In [11]:
path ="Datasets/"
pga = spark.read.csv(path+'pga_tour_historical.csv', schema=final_struc)

In [12]:
pga.printSchema()
# That's better!

root
 |-- Player Name: string (nullable = true)
 |-- Season: integer (nullable = true)
 |-- Statistic: string (nullable = true)
 |-- Variable: string (nullable = true)
 |-- Value: integer (nullable = true)



## 4. Generate summary statistics for only one variable


In [11]:
# Neat "describe" function
pga.describe(['Value']).show()

+-------+------------------+
|summary|             Value|
+-------+------------------+
|  count|           1657247|
|   mean|12494.388998743096|
| stddev| 157274.7567357075|
|    min|              -178|
|    max|           3564954|
+-------+------------------+



## 5. Generate summary statistics for TWO variables


In [12]:
pga.select("Season", "Value").summary("count", "min", "max").show()

+-------+-------+-------+
|summary| Season|  Value|
+-------+-------+-------+
|  count|1700745|1657247|
|    min|   2010|   -178|
|    max|   2018|3564954|
+-------+-------+-------+



## 6. Write a parquet file

In [14]:
df = pga.select("Season","Value")
df.write.mode("overwrite").parquet("partition_parquet/")

## 7. Write a partioned parquet file


In [15]:
df.write.mode("overwrite").partitionBy("Season").parquet("partitioned_parquet/")
df.show(5)

+------+-----+
|Season|Value|
+------+-----+
|  null| null|
|  2010|   71|
|  2010|   77|
|  2010|   83|
|  2010|   54|
+------+-----+
only showing top 5 rows



## 8. Read in a partitioned parquet file

Now try reading in the partitioned parquet file you just created above. 

In [16]:
path = "partitioned_parquet/" #Note: if you add a * to the end of the path, the Season var will be automatically dropped
parquet = spark.read.parquet(path)
        
parquet.show()

+-----+------+
|Value|Season|
+-----+------+
|   79|  2010|
|   82|  2010|
|   93|  2010|
|   74|  2010|
|   85|  2010|
|   57|  2010|
|   78|  2010|
|   79|  2010|
|   77|  2010|
|   78|  2010|
|   98|  2010|
|   90|  2010|
|   77|  2010|
|  104|  2010|
|   74|  2010|
|   87|  2010|
|   84|  2010|
|   81|  2010|
|   99|  2010|
|   89|  2010|
+-----+------+
only showing top 20 rows



## 9. Reading in a set of paritioned parquet files

Now try only reading Seasons 2010, 2011 and 2012.

In [17]:
# Notice that this method only gives you the "Value" column
path = "partitioned_parquet/"
partitioned = spark.read.parquet(path+'Season=2010/',\
                             path+'Season=2011/', \
                             path+'Season=2012/')

partitioned.show(5)

+-----+
|Value|
+-----+
|   79|
|   82|
|   93|
|   74|
|   85|
+-----+
only showing top 5 rows



In [18]:
# We need to use this method to get the "Season" and "Value" Columns
path = "partitioned_parquet/"
dataframe = spark.read.option("basePath", path).parquet(path+'Season=2010/',\
                                                                path+'Season=2011/', \
                                                                path+'Season=2012/')
dataframe.show(5)

+-----+------+
|Value|Season|
+-----+------+
|   79|  2010|
|   82|  2010|
|   93|  2010|
|   74|  2010|
|   85|  2010|
+-----+------+
only showing top 5 rows



## 10. Create your own dataframe


In [19]:
values = [('Kyle',10,'A',1),('Melbourne',36,'A',1),('Nina',123,'A',1),('Stephen',48,'B',2),('Orphan',16,'B',2),('Imran',1,'B',2)]
df = spark.createDataFrame(values,['name','age','AB','Number'])
df.show()

+---------+---+---+------+
|     name|age| AB|Number|
+---------+---+---+------+
|     Kyle| 10|  A|     1|
|Melbourne| 36|  A|     1|
|     Nina|123|  A|     1|
|  Stephen| 48|  B|     2|
|   Orphan| 16|  B|     2|
|    Imran|  1|  B|     2|
+---------+---+---+------+



# 2. Manipulating Data in DataFrames Solutions

In [1]:
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("Manip").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## Read in our Republican vs. Democrats Tweet DataFrame

In [2]:
path='Datasets/'
tweets = spark.read.csv(path+'Rep_vs_Dem_tweets.csv',inferSchema=True,header=True)

In [3]:
tweets.limit(4).toPandas()

Unnamed: 0,Party,Handle,Tweet
0,Democrat,RepDarrenSoto,"Today, Senate Dems vote to #SaveTheInternet. P..."
1,Democrat,RepDarrenSoto,RT @WinterHavenSun: Winter Haven resident / Al...
2,Democrat,RepDarrenSoto,RT @NBCLatino: .@RepDarrenSoto noted that Hurr...
3,"Congress has allocated about $18…""",,


**Prevent Truncation of view**

If the view you produced above truncated some of the longer tweets, see if you can prevent that so you can read the whole tweet.

In [49]:
tweets.select("tweet").show(3,False)

+--------------------------------------------------------------------------------------------------------------------------------------------+
|tweet                                                                                                                                       |
+--------------------------------------------------------------------------------------------------------------------------------------------+
|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House… https://t.co/n3tggDLU1L |
|RT @WinterHavenSun: Winter Haven resident / Alta Vista teacher is one of several recognized by @RepDarrenSoto for National Teacher Apprecia…|
|RT @NBCLatino: .@RepDarrenSoto noted that Hurricane Maria has left approximately $90 billion in damages.                                    |
+--------------------------------------------------------------------------------------------------------------------------------------------+

As we can see, this dataset contains three columns. The tweet content, Twitter handle that tweeted the tweet, and the party that that tweet belongs to. But it looks like the tweets could use some cleaning, esspecially if we are going to this for some kind of machine learning analysis. Let's see if we can make this an even richer dataset using the techniques we learned in the lecture!

**Print Schema**

First, check the schema to make sure the datatypes are accurate. 

In [4]:
print(tweets.printSchema())

root
 |-- Party: string (nullable = true)
 |-- Handle: string (nullable = true)
 |-- Tweet: string (nullable = true)

None


## 1. Can you identify any tweet that mentions the handle @LatinoLeader using regexp_extract?

It doesn't matter how you identify the row, any identifier will do. You can test your script on row 5 from this dataset. That row contains @LatinoLeader. 

In [76]:
from pyspark.sql.functions import * #regexp_extract
latino = tweets.withColumn('Latino_Mentions',regexp_extract(tweets.Tweet, '(.)(@LatinoLeader)(.)',2))
latino.limit(6).toPandas()

Unnamed: 0,Party,Handle,Tweet,Latino_Mentions
0,Democrat,RepDarrenSoto,"Today, Senate Dems vote to #SaveTheInternet. P...",
1,Democrat,RepDarrenSoto,RT @WinterHavenSun: Winter Haven resident / Al...,
2,Democrat,RepDarrenSoto,RT @NBCLatino: .@RepDarrenSoto noted that Hurr...,
3,"Congress has allocated about $18…""",,,
4,Democrat,RepDarrenSoto,RT @NALCABPolicy: Meeting with @RepDarrenSoto ...,@LatinoLeader
5,Democrat,RepDarrenSoto,RT @Vegalteno: Hurricane season starts on June...,


## 2. Replace any value other than 'Democrate' or 'Republican' with 'Other' in the Party column.

We can see from the output below, that there are several other values other than 'Democrate' or 'Republican' in the Part column. We are assuming that this is dirty data that needs to be cleaned up.

In [4]:
# We haven't gotten to this yet so it's a bit of a teaser :)
from pyspark.sql.functions import *
counts = tweets.groupBy("Party").count()
counts.orderBy(desc("count")).show(6)

+--------------------+-----+
|               Party|count|
+--------------------+-----+
|          Republican|44392|
|            Democrat|42068|
|            That’s…"|   28|
|https://t.co/oc6J...|   22|
|                 Now|   17|
|               Today|   13|
+--------------------+-----+
only showing top 6 rows



In [47]:
from pyspark.sql.functions import when

clean = tweets.withColumn('Party', when(tweets.Party == 'Democrat', 'Democrat').when(tweets.Party == 'Republican', 'Republican').otherwise('Other'))
counts = clean.groupBy("Party").count()
counts.orderBy(desc("count")).show(16)

+----------+-----+
|     Party|count|
+----------+-----+
|Republican|44392|
|  Democrat|42068|
|     Other| 6029|
+----------+-----+



## 3. Delete all embedded links (ie. "https:....)

For example see the first row in the tweets dataframe. 

*Note: this may require an google search :)*

In [73]:
print("OG Tweet")
tweets.select("tweet").show(1,False)

OG Tweet
+-------------------------------------------------------------------------------------------------------------------------------------------+
|tweet                                                                                                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------+
|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House… https://t.co/n3tggDLU1L|
+-------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row

Cleaned Tweet
+--------------------------------------------------------------------------------------------------------------------+
|cleaned                                                                                                             |
+----

In [39]:
# And here is the solution
print("Cleaned Tweet")
tweets.withColumn('cleaned', regexp_replace('Tweet', '(http|ftp|https)://([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:/~+#-]*[\w@?^=%&/~+#-])?', '')).select("cleaned").show(1,False)

Cleaned Tweet
+--------------------------------------------------------------------------------------------------------------------+
|cleaned                                                                                                             |
+--------------------------------------------------------------------------------------------------------------------+
|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House… |
+--------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



## 4. Remove any leading or trailing white space in the tweet column

In [4]:
from pyspark.sql.functions import *
tweets.select("Tweet").show(5,False)
tweets.select('Tweet', trim(tweets.Tweet)).show(5,False)

+--------------------------------------------------------------------------------------------------------------------------------------------+
|Tweet                                                                                                                                       |
+--------------------------------------------------------------------------------------------------------------------------------------------+
|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House… https://t.co/n3tggDLU1L |
|RT @WinterHavenSun: Winter Haven resident / Alta Vista teacher is one of several recognized by @RepDarrenSoto for National Teacher Apprecia…|
|RT @NBCLatino: .@RepDarrenSoto noted that Hurricane Maria has left approximately $90 billion in damages.                                    |
|null                                                                                                                                        |

## 5. Rename the 'Party' column to 'Dem_Rep'

No real reason here :) just wanted you to get practice doing this. 

In [75]:
renamed = tweets.withColumnRenamed('Party','Dem_Rep')
renamed.limit(4).toPandas()

Unnamed: 0,Dem_Rep,Handle,Tweet
0,Democrat,RepDarrenSoto,"Today, Senate Dems vote to #SaveTheInternet. P..."
1,Democrat,RepDarrenSoto,RT @WinterHavenSun: Winter Haven resident / Al...
2,Democrat,RepDarrenSoto,RT @NBCLatino: .@RepDarrenSoto noted that Hurr...
3,"Congress has allocated about $18…""",,


## 6. Concatenate the Party and Handle columns

Silly yes... but good practice.

pyspark.sql.functions.concat_ws(sep, *cols)[source] <br>
Concatenates multiple input string columns together into a single string column, using the given separator.

In [5]:
from pyspark.sql.functions import *
tweets.select(tweets.Party,tweets.Handle,concat_ws(' ', tweets.Party, tweets.Handle).alias('Concatenated')).show(5,False)

+--------------------+-------------+--------------------+
|               Party|       Handle|        Concatenated|
+--------------------+-------------+--------------------+
|            Democrat|RepDarrenSoto|Democrat RepDarre...|
|            Democrat|RepDarrenSoto|Democrat RepDarre...|
|            Democrat|RepDarrenSoto|Democrat RepDarre...|
|Congress has allo...|         null|Congress has allo...|
|            Democrat|RepDarrenSoto|Democrat RepDarre...|
+--------------------+-------------+--------------------+
only showing top 5 rows



## Challenge Question

Let's image that we want to analyze the hashtags that are used in these tweets. Can you extract all the hashtags you see?

In [38]:
from pyspark.sql.functions import *
# Parenthesis are used to mark a subexpression within a larger expression
# The . matches any character other than a new line
# | means is like or
# \w+ means followed by any word
pattern = '(.|'')(#)(\w+)'
# * is used to match the preceding character zero or more times.
# ? will match the preceding character zero or one times, but no more.
# $ is used to match the ending position in a string. 
split_pattern = r'.*?({pattern})'.format(pattern=pattern)
end_pattern = r'(.*{pattern}).*?$'.format(pattern=pattern)

# $1 here means to capture the first part of the regex result
# The , will separate each find with a comma in the a array we create
df2 = tweets.withColumn('a', regexp_replace('Tweet', split_pattern, '$1,')).where(col('Tweet').like('%#%'))
df2.select('a').show(3,False)
# Remove all the other results that came up
df3 = df2.withColumn('a', regexp_replace('a', end_pattern, '$1'))
df3.select('a').show(3,False)
# Finally create an array from the result by splitting on the comma
df4 = df3.withColumn('a', split('a', r','))
df4.select('a').show(3,False)
df4.limit(3).toPandas()

+-----------------------------------------------------------------------------------------+
|a                                                                                        |
+-----------------------------------------------------------------------------------------+
| #SaveTheInternet, #NetNeutrality, legislation here in the House… https://t.co/n3tggDLU1L|
| #NALCABPolicy2018,.…                                                                    |
| #NetNeutrality, rules. Find out…                                                        |
+-----------------------------------------------------------------------------------------+
only showing top 3 rows

+---------------------------------+
|a                                |
+---------------------------------+
| #SaveTheInternet, #NetNeutrality|
| #NALCABPolicy2018               |
| #NetNeutrality                  |
+---------------------------------+
only showing top 3 rows

+------------------------------------+
|a             

Unnamed: 0,Party,Handle,Tweet,a
0,Democrat,RepDarrenSoto,"Today, Senate Dems vote to #SaveTheInternet. P...","[ #SaveTheInternet, #NetNeutrality]"
1,Democrat,RepDarrenSoto,RT @NALCABPolicy: Meeting with @RepDarrenSoto ...,[ #NALCABPolicy2018]
2,Democrat,RepDarrenSoto,RT @Tharryry: I am delighted that @RepDarrenSo...,[ #NetNeutrality]


# Let's create our own dataset to work with real dates

This is a dataset of patient visits from a medical office. It contains the patients first and last names, date of birth, and the dates of their first 3 visits. 

In [25]:
from pyspark.sql.types import *

md_office = [('Mohammed','Alfasy','1987-4-8','2016-1-7','2017-2-3','2018-3-2') \
            ,('Marcy','Wellmaker','1986-4-8','2015-1-7','2017-1-3','2018-1-2') \
            ,('Ginny','Ginger','1986-7-10','2014-8-7','2015-2-3','2016-3-2') \
            ,('Vijay','Doberson','1988-5-2','2016-1-7','2018-2-3','2018-3-2') \
            ,('Orhan','Gelicek','1987-5-11','2016-5-7','2017-1-3','2018-9-2') \
            ,('Sarah','Jones','1956-7-6','2016-4-7','2017-8-3','2018-10-2') \
            ,('John','Johnson','2017-10-12','2018-1-2','2018-10-3','2018-3-2') ]

df = spark.createDataFrame(md_office,['first_name','last_name','dob','visit1','visit2','visit3']) # schema=final_struc

# Check to make sure it worked
df.show()
print(df.printSchema())

+----------+---------+----------+--------+---------+---------+
|first_name|last_name|       dob|  visit1|   visit2|   visit3|
+----------+---------+----------+--------+---------+---------+
|  Mohammed|   Alfasy|  1987-4-8|2016-1-7| 2017-2-3| 2018-3-2|
|     Marcy|Wellmaker|  1986-4-8|2015-1-7| 2017-1-3| 2018-1-2|
|     Ginny|   Ginger| 1986-7-10|2014-8-7| 2015-2-3| 2016-3-2|
|     Vijay| Doberson|  1988-5-2|2016-1-7| 2018-2-3| 2018-3-2|
|     Orhan|  Gelicek| 1987-5-11|2016-5-7| 2017-1-3| 2018-9-2|
|     Sarah|    Jones|  1956-7-6|2016-4-7| 2017-8-3|2018-10-2|
|      John|  Johnson|2017-10-12|2018-1-2|2018-10-3| 2018-3-2|
+----------+---------+----------+--------+---------+---------+

root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- visit1: string (nullable = true)
 |-- visit2: string (nullable = true)
 |-- visit3: string (nullable = true)

None


Ooops! The dates are still stored as text... let's try converting them again and see if we have any issues this time.

In [24]:
# Covert the date columns into date types
df = df.withColumn("dob", df["dob"].cast(DateType())) \
        .withColumn("visit1", df["visit1"].cast(DateType())) \
        .withColumn("visit2", df["visit2"].cast(DateType())) \
        .withColumn("visit3", df["visit3"].cast(DateType()))

# Check to make sure it worked
df.show()
print(df.printSchema())

+----------+---------+----------+----------+----------+----------+------+------+------+
|first_name|last_name|       dob|    visit1|    visit2|    visit3|Month1|Month2|Month3|
+----------+---------+----------+----------+----------+----------+------+------+------+
|  Mohammed|   Alfasy|1987-04-08|2016-01-07|2017-02-03|2018-03-02|     1|     2|     3|
|     Marcy|Wellmaker|1986-04-08|2015-01-07|2017-01-03|2018-01-02|     1|     1|     1|
|     Ginny|   Ginger|1986-07-10|2014-08-07|2015-02-03|2016-03-02|     8|     2|     3|
|     Vijay| Doberson|1988-05-02|2016-01-07|2018-02-03|2018-03-02|     1|     2|     3|
|     Orhan|  Gelicek|1987-05-11|2016-05-07|2017-01-03|2018-09-02|     5|     1|     9|
|     Sarah|    Jones|1956-07-06|2016-04-07|2017-08-03|2018-10-02|     4|     8|    10|
|      John|  Johnson|2017-10-12|2018-01-02|2018-10-03|2018-03-02|     1|    10|     3|
+----------+---------+----------+----------+----------+----------+------+------+------+

root
 |-- first_name: string (n

# 7. Can you calculate a variable showing the length of time between patient visits?

Compare visit1 to visit2 and visit2 to visit3 for all patients and see what the average length of time is between visits. Create an alias for it as well. 

In [8]:
from pyspark.sql.functions import *
diff1 = df.select(datediff(df.visit2, df.visit1).alias("diff"))
diff2 = df.select(datediff(df.visit3, df.visit2).alias("diff"))

# Append the two dataframes together
diff_combo = diff1.union(diff2)
diff_combo.show(5)

+----+
|diff|
+----+
| 393|
| 727|
| 180|
| 758|
| 241|
+----+
only showing top 5 rows



# 8. Can you calculate the age of each patient?

In [9]:
# We use the datediff function here as well
# And divide by 365 to get the age
# I also formated this value to get rid of all the decimal places
ages = df.select(format_number(datediff(df.visit1,df.dob)/365,1).alias("age"))
ages.show()

+----+
| age|
+----+
|28.8|
|28.8|
|28.1|
|27.7|
|29.0|
|59.8|
| 0.2|
+----+



## 9. Can you extract the month from the first visit column and call it "Month"?

In [7]:
month1 = df.select(month(df['visit1']).alias("Month"))
month1.show(3)

+-----+
|Month|
+-----+
|    1|
|    1|
|    8|
+-----+
only showing top 3 rows



In [14]:
# Bonus (not in lecture)
# If you wanted to make a list (or an array in this case) for all months, you could do this

df.select(array(month(df['visit1']),month(df['visit2'])).alias("Months")).show(3)

+------+
|Months|
+------+
|[1, 2]|
|[1, 1]|
|[8, 2]|
+------+
only showing top 3 rows



In [15]:
# Bonus (not in lecture)
# Or even a separate col for each month

df.select('*',month(df['visit1']).alias("Month1"),month(df['visit2']).alias("Month2")).show(3)

+----------+---------+----------+----------+----------+----------+------+------+
|first_name|last_name|       dob|    visit1|    visit2|    visit3|Month1|Month2|
+----------+---------+----------+----------+----------+----------+------+------+
|  Mohammed|   Alfasy|1987-04-08|2016-01-07|2017-02-03|2018-03-02|     1|     2|
|     Marcy|Wellmaker|1986-04-08|2015-01-07|2017-01-03|2018-01-02|     1|     1|
|     Ginny|   Ginger|1986-07-10|2014-08-07|2015-02-03|2016-03-02|     8|     2|
+----------+---------+----------+----------+----------+----------+------+------+
only showing top 3 rows



In [28]:
# Bonus (not in lecture)
# Or loop over all visit columns

# Get all visit column names
df_month_cols = [i for i in df.columns if i.startswith('visit')]

# Make a copy of our df
df2 = df

# Loop over relevant columns and add on month columns
for column in df_month_cols:
    # Find number of visit (this is straght up python, we don't need pyspark for this)
    num = str(column)[-1]
    # Create the naming convention for the new column  (python too)
    new_col_name = "Month" + num
    df2 = df2.withColumn(new_col_name,month(df[column]))
df2.show()

+----------+---------+----------+--------+---------+---------+------+------+------+
|first_name|last_name|       dob|  visit1|   visit2|   visit3|Month1|Month2|Month3|
+----------+---------+----------+--------+---------+---------+------+------+------+
|  Mohammed|   Alfasy|  1987-4-8|2016-1-7| 2017-2-3| 2018-3-2|     1|     2|     3|
|     Marcy|Wellmaker|  1986-4-8|2015-1-7| 2017-1-3| 2018-1-2|     1|     1|     1|
|     Ginny|   Ginger| 1986-7-10|2014-8-7| 2015-2-3| 2016-3-2|     8|     2|     3|
|     Vijay| Doberson|  1988-5-2|2016-1-7| 2018-2-3| 2018-3-2|     1|     2|     3|
|     Orhan|  Gelicek| 1987-5-11|2016-5-7| 2017-1-3| 2018-9-2|     5|     1|     9|
|     Sarah|    Jones|  1956-7-6|2016-4-7| 2017-8-3|2018-10-2|     4|     8|    10|
|      John|  Johnson|2017-10-12|2018-1-2|2018-10-3| 2018-3-2|     1|    10|     3|
+----------+---------+----------+--------+---------+---------+------+------+------+



## 10. Challenges with working with date and timestamps

Let's read in our supermarket sales dataframe and see some of the issues that can come up when working with date and timestamps values.

In [3]:
path = 'Datasets/'
sales = spark.read.csv(path+'supermarket_sales.csv',inferSchema=True,header=True)

In [4]:
sales.limit(6).toPandas()

Unnamed: 0,Invoice ID,Branch,City,Customer type,Gender,Product line,Unit price,Quantity,Tax 5%,Total,Date,Time,Payment,cogs,gross margin percentage,gross income,Rating
0,750-67-8428,A,Yangon,Member,Female,Health and beauty,74.69,7,26.1415,548.9715,1/5/2019,13:08,Ewallet,522.83,4.761905,26.1415,9.1
1,226-31-3081,C,Naypyitaw,Normal,Female,Electronic accessories,15.28,5,3.82,80.22,3/8/2019,10:29,Cash,76.4,4.761905,3.82,9.6
2,631-41-3108,A,Yangon,Normal,Male,Home and lifestyle,46.33,7,16.2155,340.5255,3/3/2019,13:23,Credit card,324.31,4.761905,16.2155,7.4
3,123-19-1176,A,Yangon,Member,Male,Health and beauty,58.22,8,23.288,489.048,1/27/2019,20:33,Ewallet,465.76,4.761905,23.288,8.4
4,373-73-7910,A,Yangon,Normal,Male,Sports and travel,86.31,7,30.2085,634.3785,2/8/2019,10:37,Ewallet,604.17,4.761905,30.2085,5.3
5,699-14-3026,C,Naypyitaw,Normal,Male,Electronic accessories,85.39,7,29.8865,627.6165,3/25/2019,18:30,Ewallet,597.73,4.761905,29.8865,4.1


In [5]:
print(sales.printSchema())

root
 |-- Invoice ID: string (nullable = true)
 |-- Branch: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Customer type: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Product line: string (nullable = true)
 |-- Unit price: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Tax 5%: double (nullable = true)
 |-- Total: double (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Payment: string (nullable = true)
 |-- cogs: double (nullable = true)
 |-- gross margin percentage: double (nullable = true)
 |-- gross income: double (nullable = true)
 |-- Rating: double (nullable = true)

None


Looks like we need to convert the date field into a date type. Let's go ahead and do that..

In [8]:
from pyspark.sql.types import *
# from pyspark.sql.functions import *

print("Note that this method gives all null values")
df = sales.withColumn("Formatted Date", sales["Date"].cast(DateType()))
df = df.select("Date","Formatted Date")
print(df.limit(6).toPandas())

print(" ")
print("This result gives the wrong results (notice that all months are january)")
sales.select('Date',to_date(sales.Date, 'm/d/yyyy').alias('Dateformatted'),month(to_date(sales.Date, 'm/d/yyyy')).alias('Month'),).show(5)

print(" ")
print("But if we capitalize the mm part in the format, we get the correct results!")
print("Props to your fellow classmate David Henderson for figuring this one out!")
sales.select('Date',to_date(sales.Date, 'M/d/yyyy').alias('Dateformatted'),month(to_date(sales.Date, 'M/d/yyyy')).alias('Month'),).show(5)

Note that this method gives all null values
        Date Formatted Date
0   1/5/2019           None
1   3/8/2019           None
2   3/3/2019           None
3  1/27/2019           None
4   2/8/2019           None
5  3/25/2019           None
 
This result gives the wrong results (notice that all months are january)
+---------+-------------+-----+
|     Date|Dateformatted|Month|
+---------+-------------+-----+
| 1/5/2019|   2019-01-05|    1|
| 3/8/2019|   2019-01-08|    1|
| 3/3/2019|   2019-01-03|    1|
|1/27/2019|   2019-01-27|    1|
| 2/8/2019|   2019-01-08|    1|
+---------+-------------+-----+
only showing top 5 rows

 
But if we capitalize the mm part in the format, we get the correct results!
Props to your fellow classmate David Henderson for figuring this one out!
+---------+-------------+-----+
|     Date|Dateformatted|Month|
+---------+-------------+-----+
| 1/5/2019|   2019-01-05|    1|
| 3/8/2019|   2019-03-08|    3|
| 3/3/2019|   2019-03-03|    3|
|1/27/2019|   2019-01-27|   

## Another way we can extract the month value from the date field

If your date format is uncommon, or you are not getting the expected results from the output above, you could also use this method to get what you need. 

In [20]:
# We need to creative here
# First split the date field and get the month value 
df = sales.select('Date',split(sales.Date, '/')[0].alias('Month'),'Total')

# Verify everything worked correctly
print("Verify")
df.show(5)
print(df.printSchema())

Verify
+---------+-----+--------+
|     Date|Month|   Total|
+---------+-----+--------+
| 1/5/2019|    1|548.9715|
| 3/8/2019|    3|   80.22|
| 3/3/2019|    3|340.5255|
|1/27/2019|    1| 489.048|
| 2/8/2019|    2|634.3785|
+---------+-----+--------+
only showing top 5 rows

root
 |-- Date: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Total: double (nullable = true)

None


## 11.0 Working with Arrays

Here is a dataframe of reviews from the movie the Dark Night.

In [3]:
from pyspark.sql.functions import *

values = [(5,'Epic. This is the best movie I have EVER seen'), \
          (4,'Pretty good, but I would have liked to seen better special effects'), \
          (3,'So so. Casting could have been improved'), \
          (5,'The most EPIC movie of the year! Casting was awesome. Special effects were so intense.'), \
          (4,'Solid but I would have liked to see more of the love story'), \
          (5,'THE BOMB!!!!!!!')]
reviews = spark.createDataFrame(values,['rating', 'review_txt'])

reviews.show(6,False)

+------+--------------------------------------------------------------------------------------+
|rating|review_txt                                                                            |
+------+--------------------------------------------------------------------------------------+
|5     |Epic. This is the best movie I have EVER seen                                         |
|4     |Pretty good, but I would have liked to seen better special effects                    |
|3     |So so. Casting could have been improved                                               |
|5     |The most EPIC movie of the year! Casting was awesome. Special effects were so intense.|
|4     |Solid but I would have liked to see more of the love story                            |
|5     |THE BOMB!!!!!!!                                                                       |
+------+--------------------------------------------------------------------------------------+



## 11.1 Let's see if we can create an array off of the review text column and then derive some meaningful results from it.

**But first** we need to clean the rview_txt column to make sure we can get what we need from our analysis later on. So let's do the following:

1. Remove all punctuation
2. lower case everything
3. Remove white space (trim)
3. Then finally, split the string

In [4]:
# We can do 1-3 in one call here
df = reviews.withColumn("cleaned_reviews", trim(lower(regexp_replace(col('review_txt'),'[^\sa-zA-Z0-9]', ''))))
df.show(1,False)

+------+---------------------------------------------+--------------------------------------------+
|rating|review_txt                                   |cleaned_reviews                             |
+------+---------------------------------------------+--------------------------------------------+
|5     |Epic. This is the best movie I have EVER seen|epic this is the best movie i have ever seen|
+------+---------------------------------------------+--------------------------------------------+
only showing top 1 row



In [5]:
# Then split on the spaces!
df = df.withColumn("review_txt_array", split(col("cleaned_reviews"), " "))
df.show(1,False)

+------+---------------------------------------------+--------------------------------------------+-------------------------------------------------------+
|rating|review_txt                                   |cleaned_reviews                             |review_txt_array                                       |
+------+---------------------------------------------+--------------------------------------------+-------------------------------------------------------+
|5     |Epic. This is the best movie I have EVER seen|epic this is the best movie i have ever seen|[epic, this, is, the, best, movie, i, have, ever, seen]|
+------+---------------------------------------------+--------------------------------------------+-------------------------------------------------------+
only showing top 1 row



## 11.2 Alright now let's see if we can find which reviews contain the word 'Epic'

In [8]:
epic = df.withColumn("result",array_contains(col("review_txt_array"), "epic"))
epic.toPandas()

Unnamed: 0,rating,review_txt,cleaned_reviews,review_txt_array,result
0,5,Epic. This is the best movie I have EVER seen,epic this is the best movie i have ever seen,"[epic, this, is, the, best, movie, i, have, ev...",True
1,4,"Pretty good, but I would have liked to seen be...",pretty good but i would have liked to seen bet...,"[pretty, good, but, i, would, have, liked, to,...",False
2,3,So so. Casting could have been improved,so so casting could have been improved,"[so, so, casting, could, have, been, improved]",False
3,5,The most EPIC movie of the year! Casting was a...,the most epic movie of the year casting was aw...,"[the, most, epic, movie, of, the, year, castin...",True
4,4,Solid but I would have liked to see more of th...,solid but i would have liked to see more of th...,"[solid, but, i, would, have, liked, to, see, m...",False
5,5,THE BOMB!!!!!!!,the bomb,"[the, bomb]",False


### That's it! Great Job!

# 3. Handling Missing Data in PySpark HW Solutions



In [1]:
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("nulls").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## Read in the dataset for this Notebook

In [68]:
df = spark.read.csv('Datasets/Weather.csv',inferSchema=True,header=True)

## About this dataset

**New York City Taxi Trip - Hourly Weather Data**

Here is some detailed weather data for the New York City Taxi Trips.

**Source:** https://www.kaggle.com/meinertsen/new-york-city-taxi-trip-hourly-weather-data

### Print a view of the first several lines of the dataframe to see what our data looks like

In [69]:
df.limit(8).toPandas()

Unnamed: 0,pickup_datetime,tempm,tempi,dewptm,dewpti,hum,wspdm,wspdi,wgustm,wgusti,...,precipm,precipi,conds,icon,fog,rain,snow,hail,thunder,tornado
0,2015-12-31 00:15:00,7.8,46.0,6.1,43.0,89.0,7.4,4.6,,,...,0.5,0.02,Light Rain,rain,0,1,0,0,0,0
1,2015-12-31 00:42:00,7.8,46.0,6.1,43.0,89.0,7.4,4.6,,,...,0.8,0.03,Overcast,cloudy,0,0,0,0,0,0
2,2015-12-31 00:51:00,7.8,46.0,6.1,43.0,89.0,5.6,3.5,,,...,0.8,0.03,Overcast,cloudy,0,0,0,0,0,0
3,2015-12-31 01:51:00,7.2,45.0,5.6,42.1,90.0,7.4,4.6,,,...,0.3,0.01,Overcast,cloudy,0,0,0,0,0,0
4,2015-12-31 02:51:00,7.2,45.0,5.6,42.1,90.0,0.0,0.0,,,...,,,Overcast,cloudy,0,0,0,0,0,0
5,2015-12-31 03:28:00,6.7,44.1,5.0,41.0,89.0,7.4,4.6,,,...,,,Overcast,cloudy,0,0,0,0,0,0
6,2015-12-31 03:40:00,7.2,45.0,5.0,41.0,86.0,0.0,0.0,,,...,,,Overcast,cloudy,0,0,0,0,0,0
7,2015-12-31 03:51:00,7.2,45.0,5.0,41.0,86.0,7.4,4.6,,,...,,,Overcast,cloudy,0,0,0,0,0,0


### Print the schema 

So that we can see if we need to make any corrections to the data types.

In [70]:
print(df.printSchema())

root
 |-- pickup_datetime: timestamp (nullable = true)
 |-- tempm: double (nullable = true)
 |-- tempi: double (nullable = true)
 |-- dewptm: double (nullable = true)
 |-- dewpti: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- wspdm: double (nullable = true)
 |-- wspdi: double (nullable = true)
 |-- wgustm: double (nullable = true)
 |-- wgusti: double (nullable = true)
 |-- wdird: integer (nullable = true)
 |-- wdire: string (nullable = true)
 |-- vism: double (nullable = true)
 |-- visi: double (nullable = true)
 |-- pressurem: double (nullable = true)
 |-- pressurei: double (nullable = true)
 |-- windchillm: double (nullable = true)
 |-- windchilli: double (nullable = true)
 |-- heatindexm: double (nullable = true)
 |-- heatindexi: double (nullable = true)
 |-- precipm: double (nullable = true)
 |-- precipi: double (nullable = true)
 |-- conds: string (nullable = true)
 |-- icon: string (nullable = true)
 |-- fog: integer (nullable = true)
 |-- rain: integer (nullab

## 1. How much missing data are we working with?

Get a count and percentage of each variable in the dataset to answer this question.

In [71]:
from pyspark.sql.functions import *

def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows,(nullRows/numRows)*100
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_calc_list = null_value_calc(df)
spark.createDataFrame(null_columns_calc_list, ['Column_With_Null_Value', 'Null_Values_Count','Null_Value_Percent']).show()

+----------------------+-----------------+-------------------+
|Column_With_Null_Value|Null_Values_Count| Null_Value_Percent|
+----------------------+-----------------+-------------------+
|                 tempm|                5|0.04770537162484496|
|                 tempi|                5|0.04770537162484496|
|                dewptm|                5|0.04770537162484496|
|                dewpti|                5|0.04770537162484496|
|                   hum|                5|0.04770537162484496|
|                 wspdm|              737|  7.031771777502146|
|                 wspdi|              737|  7.031771777502146|
|                wgustm|             8605|  82.10094456635817|
|                wgusti|             8605|  82.10094456635817|
|                  vism|              245| 2.3375632096174033|
|                  visi|              245| 2.3375632096174033|
|             pressurem|              239| 2.2803167636675887|
|             pressurei|              239| 2.2803167636

## 2. How many rows contain at least one null value?

We want to know, if we use the df.ha option, how many rows will we loose. 

In [74]:
og_len = df.count()
drop_len = df.na.drop().count()
print("Total Rows in the DF: ",og_len)
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows in the DF:  10481
Total Rows Dropped: 10481
Percentage of Rows Dropped 1.0


Yikes! Everything

## 3. Drop the missing data

Drop any row that contains missing data across the whole dataset

In [75]:
dropped = df.na.drop()
dropped.limit(4).toPandas() 

# Note this statement is equivilant to the above:
# zomato.na.drop(how='any').limit(4).toPandas() 

Unnamed: 0,pickup_datetime,tempm,tempi,dewptm,dewpti,hum,wspdm,wspdi,wgustm,wgusti,...,precipm,precipi,conds,icon,fog,rain,snow,hail,thunder,tornado


Yep, we have no more data :(

## 4. Drop with a threshold

Count how many rows would be dropped if we only dropped rows that had a least 12 NON-Null values

In [83]:
og_len = df.count()
drop_len = df.na.drop(thresh=12).count()
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows Dropped: 5
Percentage of Rows Dropped 0.00047705371624844956


## 5. Drop rows according to specific column value

Now count how many rows would be dropped if you only drop rows whose values in the tempm column are null/NaN

In [84]:
og_len = df.count()
drop_len = df.na.drop(subset=["tempm"]).count() 
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows Dropped: 5
Percentage of Rows Dropped 0.00047705371624844956


In [88]:
# Another way to do the above
og_len = df.count()
drop_len = df.filter(df.tempm.isNotNull()).count() 
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows Dropped: 5
Percentage of Rows Dropped 0.00047705371624844956


## 6. Drop rows that are null accross all columns

Count how many rows would be dropped if you only dropped rows where ALL the values are null

In [85]:
og_len = df.count()
drop_len = df.na.drop(how='all').count() 
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows Dropped: 0
Percentage of Rows Dropped 0.0


That's good news!

## 7. Fill in all the string columns missing values with the word "N/A"

Make sure you don't edit the df dataframe itself. Create a copy of the df then edit that one.

In [89]:
null_fill = df.na.fill('N/A')
null_fill.limit(4).toPandas()

Unnamed: 0,pickup_datetime,tempm,tempi,dewptm,dewpti,hum,wspdm,wspdi,wgustm,wgusti,...,precipm,precipi,conds,icon,fog,rain,snow,hail,thunder,tornado
0,2015-12-31 00:15:00,7.8,46.0,6.1,43.0,89.0,7.4,4.6,,,...,0.5,0.02,Light Rain,rain,0,1,0,0,0,0
1,2015-12-31 00:42:00,7.8,46.0,6.1,43.0,89.0,7.4,4.6,,,...,0.8,0.03,Overcast,cloudy,0,0,0,0,0,0
2,2015-12-31 00:51:00,7.8,46.0,6.1,43.0,89.0,5.6,3.5,,,...,0.8,0.03,Overcast,cloudy,0,0,0,0,0,0
3,2015-12-31 01:51:00,7.2,45.0,5.6,42.1,90.0,7.4,4.6,,,...,0.3,0.01,Overcast,cloudy,0,0,0,0,0,0


## 8. Fill in NaN values with averages for the tempm and tempi columns

*Note: you will first need to compute the averages for each column and then fill in with the corresponding value.*

In [91]:
def fill_with_mean(df, include=set()): 
    stats = df.agg(*(
        avg(c).alias(c) for c in df.columns if c in include
    ))
#     stats = stats.select(*(col(c).cast("int").alias(c) for c in stats.columns)) #IntegerType()
    return df.na.fill(stats.first().asDict())

updated_df = fill_with_mean(df, ["tempm","tempi"])
updated_df.limit(5).toPandas()

Unnamed: 0,pickup_datetime,tempm,tempi,dewptm,dewpti,hum,wspdm,wspdi,wgustm,wgusti,...,precipm,precipi,conds,icon,fog,rain,snow,hail,thunder,tornado
0,2015-12-31 00:15:00,7.8,46.0,6.1,43.0,89.0,7.4,4.6,,,...,0.5,0.02,Light Rain,rain,0,1,0,0,0,0
1,2015-12-31 00:42:00,7.8,46.0,6.1,43.0,89.0,7.4,4.6,,,...,0.8,0.03,Overcast,cloudy,0,0,0,0,0,0
2,2015-12-31 00:51:00,7.8,46.0,6.1,43.0,89.0,5.6,3.5,,,...,0.8,0.03,Overcast,cloudy,0,0,0,0,0,0
3,2015-12-31 01:51:00,7.2,45.0,5.6,42.1,90.0,7.4,4.6,,,...,0.3,0.01,Overcast,cloudy,0,0,0,0,0,0
4,2015-12-31 02:51:00,7.2,45.0,5.6,42.1,90.0,0.0,0.0,,,...,,,Overcast,cloudy,0,0,0,0,0,0


### That's it! Great Job!

# 4. Search and Filter DataFrames in PySpark Homework Solutions

In [1]:
# First let's create our PySpark instance
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("FunctionsHW").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## Read in the DataFrame for this Notebook

We will be continuing to use the fifa19.csv file for this notebook. Make sure that you are writting the correct path to the file. 

In [2]:
fifa = spark.read.csv('Datasets/fifa19.csv',inferSchema=True,header=True)

Use the .toPandas() method to view the first few lines of the dataset so we know what we are working with. 

In [3]:
fifa.limit(4).toPandas()

Unnamed: 0,_c0,ID,Name,Age,Photo,Nationality,Flag,Overall,Potential,Club,...,Composure,Marking,StandingTackle,SlidingTackle,GKDiving,GKHandling,GKKicking,GKPositioning,GKReflexes,Release Clause
0,0,158023,L. Messi,31,https://cdn.sofifa.org/players/4/19/158023.png,Argentina,https://cdn.sofifa.org/flags/52.png,94,94,FC Barcelona,...,96,33,28,26,6,11,15,14,8,€226.5M
1,1,20801,Cristiano Ronaldo,33,https://cdn.sofifa.org/players/4/19/20801.png,Portugal,https://cdn.sofifa.org/flags/38.png,94,94,Juventus,...,95,28,31,23,7,11,15,14,11,€127.1M
2,2,190871,Neymar Jr,26,https://cdn.sofifa.org/players/4/19/190871.png,Brazil,https://cdn.sofifa.org/flags/54.png,92,93,Paris Saint-Germain,...,94,27,24,33,9,9,15,15,11,€228.1M
3,3,193080,De Gea,27,https://cdn.sofifa.org/players/4/19/193080.png,Spain,https://cdn.sofifa.org/flags/45.png,91,93,Manchester United,...,68,15,21,13,90,85,87,88,94,€138.6M


Now print the schema of the dataset so we can see the data types of all the varaibles. 

In [4]:
print(fifa.printSchema())

root
 |-- _c0: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Photo: string (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- Flag: string (nullable = true)
 |-- Overall: integer (nullable = true)
 |-- Potential: integer (nullable = true)
 |-- Club: string (nullable = true)
 |-- Club Logo: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Wage: string (nullable = true)
 |-- Special: integer (nullable = true)
 |-- Preferred Foot: string (nullable = true)
 |-- International Reputation: integer (nullable = true)
 |-- Weak Foot: integer (nullable = true)
 |-- Skill Moves: integer (nullable = true)
 |-- Work Rate: string (nullable = true)
 |-- Body Type: string (nullable = true)
 |-- Real Face: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Jersey Number: integer (nullable = true)
 |-- Joined: string (nullable = true)
 |-- Loaned From: string (nu

In [5]:
from pyspark.sql.functions import *

### 1. Select the Name and Position of each player in the dataframe

In [6]:
fifa.select(['Name','Position','Release Clause']).show(5,False)

+-----------------+--------+--------------+
|Name             |Position|Release Clause|
+-----------------+--------+--------------+
|L. Messi         |RF      |€226.5M       |
|Cristiano Ronaldo|ST      |€127.1M       |
|Neymar Jr        |LW      |€228.1M       |
|De Gea           |GK      |€138.6M       |
|K. De Bruyne     |RCM     |€196.4M       |
+-----------------+--------+--------------+
only showing top 5 rows



### 1.1 Display the same results from above sorted by the players names

In [7]:
fifa.select(['Name','Position']).orderBy('Name').show(5)

+-------------+--------+
|         Name|Position|
+-------------+--------+
|     A. Abang|      ST|
|A. Abdellaoui|      LB|
| A. Abdennour|      CB|
|      A. Abdi|      CM|
|A. Abdu Jaber|      ST|
+-------------+--------+
only showing top 5 rows



### 2. Select only the players who belong to a club begining with FC

In [8]:
# One way
fifa.select("Name","Club").where(fifa.Club.like("FC%")).show(5, False)

+---------------+-----------------+
|Name           |Club             |
+---------------+-----------------+
|L. Messi       |FC Barcelona     |
|L. Suárez      |FC Barcelona     |
|R. Lewandowski |FC Bayern München|
|M. ter Stegen  |FC Barcelona     |
|Sergio Busquets|FC Barcelona     |
+---------------+-----------------+
only showing top 5 rows



In [9]:
# Another way 
fifa.select("Name","Club").where(fifa.Club.startswith("FC")).limit(4).toPandas()

Unnamed: 0,Name,Club
0,L. Messi,FC Barcelona
1,L. Suárez,FC Barcelona
2,R. Lewandowski,FC Bayern München
3,M. ter Stegen,FC Barcelona


### 3. Who is the oldest player in the dataset and how old are they?

Display only the name and age of the oldest player.

In [10]:
fifa.select("Name","Age").sort(desc("Age")).show(1)

+--------+---+
|    Name|Age|
+--------+---+
|O. Pérez| 45|
+--------+---+
only showing top 1 row



### 4. Select only the following players from the dataframe:

 - L. Messi
 - Cristiano Ronaldo

In [11]:
fifa[fifa.Name.isin("L. Messi", "Cristiano Ronaldo")].limit(4).toPandas()

Unnamed: 0,_c0,ID,Name,Age,Photo,Nationality,Flag,Overall,Potential,Club,...,Composure,Marking,StandingTackle,SlidingTackle,GKDiving,GKHandling,GKKicking,GKPositioning,GKReflexes,Release Clause
0,0,158023,L. Messi,31,https://cdn.sofifa.org/players/4/19/158023.png,Argentina,https://cdn.sofifa.org/flags/52.png,94,94,FC Barcelona,...,96,33,28,26,6,11,15,14,8,€226.5M
1,1,20801,Cristiano Ronaldo,33,https://cdn.sofifa.org/players/4/19/20801.png,Portugal,https://cdn.sofifa.org/flags/38.png,94,94,Juventus,...,95,28,31,23,7,11,15,14,11,€127.1M


### 5. Can you select the first character from the Release Clause variable which indicates the currency used?

In [12]:
fifa.select("Release Clause",fifa["Release Clause"].substr(1,1)).show(5,False)

+--------------+-------------------------------+
|Release Clause|substring(Release Clause, 1, 1)|
+--------------+-------------------------------+
|€226.5M       |€                              |
|€127.1M       |€                              |
|€228.1M       |€                              |
|€138.6M       |€                              |
|€196.4M       |€                              |
+--------------+-------------------------------+
only showing top 5 rows



### 6. Can you select only the players who are over the age of 40?

In [13]:
fifa.filter("Age>40").limit(4).toPandas()

Unnamed: 0,_c0,ID,Name,Age,Photo,Nationality,Flag,Overall,Potential,Club,...,Composure,Marking,StandingTackle,SlidingTackle,GKDiving,GKHandling,GKKicking,GKPositioning,GKReflexes,Release Clause
0,1120,156092,J. Villar,41,https://cdn.sofifa.org/players/4/19/156092.png,Paraguay,https://cdn.sofifa.org/flags/58.png,77,77,,...,55,13,13,14,75,75,74,78,77,
1,4228,3665,B. Nivet,41,https://cdn.sofifa.org/players/4/19/3665.png,France,https://cdn.sofifa.org/flags/18.png,71,71,ESTAC Troyes,...,82,58,56,43,11,7,8,14,7,
2,4741,140029,O. Pérez,45,https://cdn.sofifa.org/players/4/19/140029.png,Mexico,https://cdn.sofifa.org/flags/83.png,71,71,Pachuca,...,62,23,12,11,70,64,65,73,74,€272K
3,7225,142998,C. Muñoz,41,https://cdn.sofifa.org/players/4/19/142998.png,Argentina,https://cdn.sofifa.org/flags/52.png,68,68,CD Universidad de Concepción,...,62,18,14,19,67,65,68,71,68,€84K


### That's is for now... Great Job!

# 5. Joining and Appending DataFrames in PySpark HW Solutions

Now it's time to test your knowledge and further engrain the concepts we touched on in the lectures. Let's go ahead and get started.




**As always let's start our Spark instance.**

In [1]:
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("joins").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


In [2]:
import os

path = "Datasets/uw-madison-courses/"

df_list = []
for filename in os.listdir(path):
    if filename.endswith(".csv"):
        filename_list = filename.split(".") #separate path from .csv
        df_name = filename_list[0]
        df = spark.read.csv(path+filename,inferSchema=True,header=True)
        df.name = df_name
        df_list.append(df_name)
        exec(df_name + ' = df')
        
# QA
print("Full list of dfs:")
print(df_list)

Full list of dfs:
['subjects', 'subject_memberships', 'rooms', 'schedules', 'sections', 'courses', 'course_offerings', 'instructors', 'teachings', 'grade_distributions']


Now check the contents of a few of the dataframes that were read in above.

In [4]:
grade_distributions.limit(4).toPandas()

Unnamed: 0,course_offering_uuid,section_number,a_count,ab_count,b_count,bc_count,c_count,d_count,f_count,s_count,u_count,cr_count,n_count,p_count,i_count,nw_count,nr_count,other_count
0,344b3ebe-da7e-314c-83ed-9425269695fd,1,105,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,f718e6cd-33f0-3c14-a9a6-834d9c3610a8,1,158,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0
2,ea3b717c-d66b-30dc-8b37-964d9688295f,1,139,12,2,0,3,0,0,0,0,0,0,0,0,0,0,0
3,075da420-5f49-3dd0-93df-13e3c152e1b1,1,87,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0


## 1a. Can you assign the room numbers to each section of each course?

Show only the rooms uuid, facility code, room number, term code and the name of the course from the course_offerings table.

In [3]:
step1 = rooms.join(sections, rooms.uuid == sections.room_uuid, how='left').select([rooms.uuid,rooms.facility_code,sections.course_offering_uuid,'number'])
step1.limit(4).toPandas()

Unnamed: 0,uuid,facility_code,course_offering_uuid,number
0,0feb8e9a-88b8-3d80-a13d-72c0c2fa9939,140,611e5499-724c-3464-82f3-1a5bf29dd96c,307
1,0feb8e9a-88b8-3d80-a13d-72c0c2fa9939,140,43688423-905c-3455-bf40-a58adce537f7,3
2,0feb8e9a-88b8-3d80-a13d-72c0c2fa9939,140,43688423-905c-3455-bf40-a58adce537f7,3
3,0feb8e9a-88b8-3d80-a13d-72c0c2fa9939,140,871476fd-340f-303c-a7c6-44ca6ef6b84d,311


In [5]:
step2 = step1.join(course_offerings, step1.course_offering_uuid == course_offerings.uuid, how='left').select([rooms.uuid,rooms.facility_code,'number','term_code','name'])
step2.limit(4).toPandas()

Unnamed: 0,uuid,facility_code,number,term_code,name
0,0feb8e9a-88b8-3d80-a13d-72c0c2fa9939,140,307,1182,International Business
1,0feb8e9a-88b8-3d80-a13d-72c0c2fa9939,140,3,1174,Intro Managerial Accounting
2,0feb8e9a-88b8-3d80-a13d-72c0c2fa9939,140,3,1174,Intro Managerial Accounting
3,0feb8e9a-88b8-3d80-a13d-72c0c2fa9939,140,311,1174,Business Law


## 1b. Now show same output as above but for only facility number 0469 (facility_code)

In [6]:
step3 = step2.filter(step2.facility_code == "0469")
step3.limit(4).toPandas()

Unnamed: 0,uuid,facility_code,number,term_code,name
0,3c570ec9-1a27-3902-a16f-d1676890e63b,469,337,1082,Introduction to Philosophy
1,3c570ec9-1a27-3902-a16f-d1676890e63b,469,352,1082,Introduction to Philosophy
2,3c570ec9-1a27-3902-a16f-d1676890e63b,469,308,1102,Introduction to Philosophy
3,3c570ec9-1a27-3902-a16f-d1676890e63b,469,307,1102,Introduction to Philosophy


## 2. Count how many sections are offered for each subject for each facility

*Note: this will involve a groupby*

In [7]:
step1 = subjects.join(subject_memberships, subjects.code == subject_memberships.subject_code, how='inner').select(['name','course_offering_uuid']).withColumnRenamed('name','subject_name')
step1.limit(4).toPandas()

Unnamed: 0,subject_name,course_offering_uuid
0,Chemical and Biological Engineering,344b3ebe-da7e-314c-83ed-9425269695fd
1,Electrical and Computer Engineering,344b3ebe-da7e-314c-83ed-9425269695fd
2,Engineering Mechanics and Astronautics,344b3ebe-da7e-314c-83ed-9425269695fd
3,Mechanical Engineering,344b3ebe-da7e-314c-83ed-9425269695fd


In [5]:
step2 = step1.join(sections, step1.course_offering_uuid == sections.course_offering_uuid, how='left').select(['subject_name','room_uuid'])
step2.limit(4).toPandas()

Unnamed: 0,subject_name,room_uuid
0,Communication Arts,dd6118e0-7221-3b81-9b29-aad61f0ede54
1,Communication Arts,4e9c55b8-7c02-36c3-b907-3e832ea285e7
2,Communication Arts,4948c250-c6cf-3272-a497-f90962f3ba67
3,Communication Arts,698555af-8ec3-3b85-8e67-bd7fc53aba80


In [20]:
# I added a filter to make this a little simpler
step3 = step2.join(rooms, step2.room_uuid == rooms.uuid, how='left').filter('facility_code IN("0140","0545","0469","0031")').select(['subject_name','facility_code','room_code'])
step3.limit(4).toPandas()

Unnamed: 0,subject_name,facility_code,room_code
0,Communication Arts,140,1070
1,Communication Arts,545,4028
2,Communication Arts,545,4008
3,Communication Arts,469,2241


In [21]:
# Option 1: Group by facility code and do a count
step3.groupBy('facility_code','subject_name').count().orderBy("facility_code").show(10, False) # False prevents truncation of column content

+-------------+------------------------------------------------+-----+
|facility_code|subject_name                                    |count|
+-------------+------------------------------------------------+-----+
|0031         |Curriculum and Instruction                      |36   |
|0031         |Statistics                                      |6    |
|0031         |Philosophy                                      |1    |
|0031         |Communication Sciences and Disorders            |5    |
|0031         |Interdisciplinary Courses (L&S)                 |1    |
|0031         |Interdisciplinary Courses (Engineering)         |1    |
|0031         |Biochemistry                                    |2    |
|0031         |Electrical and Computer Engineering             |2    |
|0031         |History of Science                              |1    |
|0031         |Environmental Studies - Gaylord Nelson Institute|1    |
+-------------+------------------------------------------------+-----+
only s

In [24]:
# Option 2: Groupby subject name and pivot the facility code
# to see each facility side by side within each subject
step3.groupBy("subject_name").pivot("facility_code").count().show(10, False)

+---------------------------------+----+----+----+----+
|subject_name                     |0031|0140|0469|0545|
+---------------------------------+----+----+----+----+
|Asian American Studies           |null|6   |245 |24  |
|Religious Studies                |null|32  |282 |43  |
|HEBREW                           |null|7   |4   |null|
|Botany                           |null|4   |24  |2   |
|Urban and Regional Planning      |null|208 |17  |6   |
|Nutritional Sciences             |6   |null|null|28  |
|Kinesiology                      |1549|null|null|null|
|Art Education (Department of Art)|null|null|145 |null|
|Philosophy                       |1   |80  |1296|73  |
|Microbiology                     |null|null|1   |null|
+---------------------------------+----+----+----+----+
only showing top 10 rows



## 3. What are the hardest classes?

Let's see if we can figure out which classes are the hardest by seeing how many students failed. Note that you will first need to aggregate the grades table by the course uuid to include all sections. Show the name of the course as well that you will need to get from the course_offering table.

In [5]:
grade_distributions.limit(4).toPandas()

Unnamed: 0,course_offering_uuid,section_number,a_count,ab_count,b_count,bc_count,c_count,d_count,f_count,s_count,u_count,cr_count,n_count,p_count,i_count,nw_count,nr_count,other_count
0,344b3ebe-da7e-314c-83ed-9425269695fd,1,105,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,f718e6cd-33f0-3c14-a9a6-834d9c3610a8,1,158,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0
2,ea3b717c-d66b-30dc-8b37-964d9688295f,1,139,12,2,0,3,0,0,0,0,0,0,0,0,0,0,0
3,075da420-5f49-3dd0-93df-13e3c152e1b1,1,87,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0


In [9]:
course_offerings.limit(4).toPandas()

Unnamed: 0,uuid,course_uuid,term_code,name
0,344b3ebe-da7e-314c-83ed-9425269695fd,a3e3e1c3-543d-3bb5-ae65-5f2aec4ad1de,1092,Cooperative Education Prog
1,f718e6cd-33f0-3c14-a9a6-834d9c3610a8,a3e3e1c3-543d-3bb5-ae65-5f2aec4ad1de,1082,Cooperative Education Prog
2,ea3b717c-d66b-30dc-8b37-964d9688295f,a3e3e1c3-543d-3bb5-ae65-5f2aec4ad1de,1172,Cooperative Education Prog
3,075da420-5f49-3dd0-93df-13e3c152e1b1,a3e3e1c3-543d-3bb5-ae65-5f2aec4ad1de,1114,Cooperative Education Prog


In [12]:
step1 = grade_distributions.groupBy("course_offering_uuid").sum("f_count")
step1.limit(4).toPandas()

Unnamed: 0,course_offering_uuid,sum(f_count)
0,c939dd5e-43ba-3266-8f74-573f629de75b,0
1,75fdf27b-0e52-3544-96eb-d594a5ad969f,0
2,cfeba94d-8e0e-320b-a904-ea2c2a31c000,7
3,ceabe145-78e5-33c2-9b68-3a1eea9c2864,4


In [22]:
step2 = step1.join(course_offerings, step1.course_offering_uuid == course_offerings.uuid, how='left').select(['name','sum(f_count)']).orderBy("sum(f_count)")
step2.toPandas().tail(5)

Unnamed: 0,name,sum(f_count)
80166,Calculus&Analytic Geometry,63
80167,Calculus&Analytic Geometry 1,64
80168,Calculus&Analytic Geometry,67
80169,Animal Biology,70
80170,Calc--Functns of Variables,72


## Challenge Question: Automating data entry errors

We see in the dataframe below that there are several typos of various animal names. If this was a large database of several millions of records, correcting these errors would be way too labor intensive. How can we automate correcting these errors?

*Hint: Leven...*

In [8]:
values = [('Monkey',10),('Monkay',36),('Mnky',123), \
          ('Elephant',48),('Elefant',16),('Ellafant',1), \
          ('Hippopotamus',48),('Hipopotamus',16),('Hippo',1)]
zoo = spark.createDataFrame(values,['Animal','age'])
zoo.show()

+------------+---+
|      Animal|age|
+------------+---+
|      Monkey| 10|
|      Monkay| 36|
|        Mnky|123|
|    Elephant| 48|
|     Elefant| 16|
|    Ellafant|  1|
|Hippopotamus| 48|
| Hipopotamus| 16|
|       Hippo|  1|
+------------+---+



In [14]:
# With the levenshtein distance!
from pyspark.sql.functions import *
from pyspark.sql.types import *

# First we create a dataframe with the 3 options we want to choose from
options = spark.createDataFrame(['Monkey', 'Elephant', 'Hippopotamus'], StringType())
options.show()

+------------+
|       value|
+------------+
|      Monkey|
|    Elephant|
|Hippopotamus|
+------------+



In [16]:
# And then we join the two dataframes together with a condition >5
results = zoo.join(options, levenshtein(zoo["Animal"], options["value"]) < 5, 'left')
results.show()

+------------+---+------------+
|      Animal|age|       value|
+------------+---+------------+
|      Monkey| 10|      Monkey|
|      Monkay| 36|      Monkey|
|        Mnky|123|      Monkey|
|    Elephant| 48|    Elephant|
|     Elefant| 16|    Elephant|
|    Ellafant|  1|    Elephant|
|Hippopotamus| 48|Hippopotamus|
| Hipopotamus| 16|Hippopotamus|
|       Hippo|  1|        null|
+------------+---+------------+



So we can see here that all of our values were correctly identified except for "Hippo" which was just way too different from "Hippopotamus" to get correctly identified. So this solution won't work for EVERY case, but we can see here that it did a great job correcting simple gramatical errors. 

# 6. Aggregating DataFrames in PySpark HW Solutions


In [2]:
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("aggregate").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## Read in the dataFrame for this Notebook

In [3]:
airbnb = spark.read.csv('nyc_air_bnb.csv',inferSchema=True,header=True)

In [3]:
airbnb.limit(5).toPandas()

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.21,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.38,2,355
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.64,1,194
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.1,1,0


In [4]:
print(airbnb.printSchema())

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: integer (nullable = true)

None


Notice here that some of the columns that are obviously numeric have been incorrectly identified as "strings". Let's edit that. Otherwise we cannot aggregate any of the numeric columns.

In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

df = airbnb.withColumn("price", airbnb["price"].cast(IntegerType())) \
        .withColumn("minimum_nights", airbnb["minimum_nights"].cast(IntegerType())) \
        .withColumn("number_of_reviews", airbnb["number_of_reviews"].cast(IntegerType())) \
        .withColumn("reviews_per_month", airbnb["reviews_per_month"].cast(IntegerType())) \
        .withColumn("calculated_host_listings_count", airbnb["calculated_host_listings_count"].cast(IntegerType()))
#QA
print(df.printSchema())
df.limit(5).toPandas()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: integer (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)

None


Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.0,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.0,2,355
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.0,1,194
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.0,1,0


### Alright now we are ready to dig in!


### 1. How many rows are in this dataset?

In [5]:
df.count()

49079

### 2. How many total reviews does each host have?

In [6]:
df.groupBy("host_id").sum('number_of_reviews').show(10)

+-------+----------------------+
|host_id|sum(number_of_reviews)|
+-------+----------------------+
| 716306|                   197|
|1203500|                    35|
| 368528|                     1|
|1577493|                    16|
|1390555|                    50|
|1317588|                     3|
|2472680|                   219|
|2155832|                   266|
|2426404|                     6|
|2740824|                    22|
+-------+----------------------+
only showing top 10 rows



### 3. Show the min and max of all the numeric variables in the dataset

In [11]:
limit_summary = df.select("price","minimum_nights","number_of_reviews","last_review","reviews_per_month","calculated_host_listings_count","availability_365").summary("min","max")
limit_summary.toPandas()

Unnamed: 0,summary,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,min,-74,0,0,-73.94134,0,0,0
1,max,10000,1250,629,9.66,58,365,365


### 4. Which host had the highest number of reviews?

Only display the top result.

Bonus: format the column names

In [12]:
from pyspark.sql import functions
df.groupBy("host_id").agg(sum("number_of_reviews").alias("Reviews")).orderBy(sum("number_of_reviews").desc()).show(1) 

+--------+-------+
| host_id|Reviews|
+--------+-------+
|37312959|   2273|
+--------+-------+
only showing top 1 row



### 5. On average, how many nights did most hosts specify for a minimum?

In [23]:
df.agg({'minimum_nights':'avg'}).withColumnRenamed("avg(minimum_nights)", "Avg Min Nights").show()

+------------------+
|    Avg Min Nights|
+------------------+
|7.1286126280910596|
+------------------+



In [10]:
df.agg(mean(df.minimum_nights)).show()

+-------------------+
|avg(minimum_nights)|
+-------------------+
| 7.1286126280910596|
+-------------------+



### 6. What is the most expensive neighborhood to stay in on average?

Note: only show the one result

In [13]:
result = df.groupBy("neighbourhood").agg(avg(df.price).alias('avg_price'))
result.orderBy(result.avg_price.desc()).show(1) 

+--------------+---------+
| neighbourhood|avg_price|
+--------------+---------+
|Fort Wadsworth|    800.0|
+--------------+---------+
only showing top 1 row



### 7. Display a two by two table that shows the average prices by room type (private and shared only) and neighborhood group (Manhattan and Brooklyn only)

In [11]:
df.filter("room_type IN('Private room','Shared room')").groupBy("room_type").pivot("neighbourhood_group", ["Manhattan", "Brooklyn"]).avg('price').show(100)

+------------+------------------+-----------------+
|   room_type|         Manhattan|         Brooklyn|
+------------+------------------+-----------------+
| Shared room| 89.06903765690376|50.52784503631961|
|Private room|116.05400302114803|76.47234042553191|
+------------+------------------+-----------------+



# 7. SQL Options in Spark HW Solutions

Alirght let's apply what we learned in the lecture to a new dataset!

**But first!**

Let's start with Spark SQL. But first we need to create a Spark Session!

In [1]:
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("SparkSQLHWSolutions").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## Read in our DataFrame for this Notebook

For this notebook we will be using the Google Play Store csv file attached to this lecture. Let's go ahead and read it in. 

### About this dataset

Contains a list of Google Play Store Apps and info about the apps like the category, rating, reviews, size, etc. 

**Source:** https://www.kaggle.com/lava18/google-play-store-apps

In [2]:
path = 'Datasets/'

googlep = spark.read.csv(path+"googleplaystore.csv",header=True,inferSchema=True)

## First things first

Let's check out the first few lines of the dataframe to see what we are working with

In [3]:
# This is way better
googlep.limit(5).toPandas()

Unnamed: 0,App,Category,Rating,Reviews,Size,Installs,Type,Price,Content Rating,Genres,Last Updated,Current Ver,Android Ver
0,Photo Editor & Candy Camera & Grid & ScrapBook,ART_AND_DESIGN,4.1,159,19M,"10,000+",Free,0,Everyone,Art & Design,"January 7, 2018",1.0.0,4.0.3 and up
1,Coloring book moana,ART_AND_DESIGN,3.9,967,14M,"500,000+",Free,0,Everyone,Art & Design;Pretend Play,"January 15, 2018",2.0.0,4.0.3 and up
2,"U Launcher Lite – FREE Live Cool Themes, Hide ...",ART_AND_DESIGN,4.7,87510,8.7M,"5,000,000+",Free,0,Everyone,Art & Design,"August 1, 2018",1.2.4,4.0.3 and up
3,Sketch - Draw & Paint,ART_AND_DESIGN,4.5,215644,25M,"50,000,000+",Free,0,Teen,Art & Design,"June 8, 2018",Varies with device,4.2 and up
4,Pixel Draw - Number Art Coloring Book,ART_AND_DESIGN,4.3,967,2.8M,"100,000+",Free,0,Everyone,Art & Design;Creativity,"June 20, 2018",1.1,4.4 and up


As well as the schema to make sure all the column types were correctly infered

In [4]:
print(googlep.printSchema())

root
 |-- App: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Reviews: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Installs: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Content Rating: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Last Updated: string (nullable = true)
 |-- Current Ver: string (nullable = true)
 |-- Android Ver: string (nullable = true)

None


Looks like we need to edit some of the datatypes. Let's just update Rating, Reviews and Price as integer (float for Rating) values for now, since the Size and Installs variables will need a bit more cleaning.

In [5]:
from pyspark.sql.types import IntegerType, FloatType
df = googlep.withColumn("Rating", googlep["Rating"].cast(FloatType())) \
            .withColumn("Reviews", googlep["Reviews"].cast(IntegerType())) \
            .withColumn("Price", googlep["Price"].cast(IntegerType()))
print(df.printSchema())
df.limit(5).toPandas()

root
 |-- App: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Rating: float (nullable = true)
 |-- Reviews: integer (nullable = true)
 |-- Size: string (nullable = true)
 |-- Installs: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Content Rating: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Last Updated: string (nullable = true)
 |-- Current Ver: string (nullable = true)
 |-- Android Ver: string (nullable = true)

None


Unnamed: 0,App,Category,Rating,Reviews,Size,Installs,Type,Price,Content Rating,Genres,Last Updated,Current Ver,Android Ver
0,Photo Editor & Candy Camera & Grid & ScrapBook,ART_AND_DESIGN,4.1,159,19M,"10,000+",Free,0,Everyone,Art & Design,"January 7, 2018",1.0.0,4.0.3 and up
1,Coloring book moana,ART_AND_DESIGN,3.9,967,14M,"500,000+",Free,0,Everyone,Art & Design;Pretend Play,"January 15, 2018",2.0.0,4.0.3 and up
2,"U Launcher Lite – FREE Live Cool Themes, Hide ...",ART_AND_DESIGN,4.7,87510,8.7M,"5,000,000+",Free,0,Everyone,Art & Design,"August 1, 2018",1.2.4,4.0.3 and up
3,Sketch - Draw & Paint,ART_AND_DESIGN,4.5,215644,25M,"50,000,000+",Free,0,Teen,Art & Design,"June 8, 2018",Varies with device,4.2 and up
4,Pixel Draw - Number Art Coloring Book,ART_AND_DESIGN,4.3,967,2.8M,"100,000+",Free,0,Everyone,Art & Design;Creativity,"June 20, 2018",1.1,4.4 and up


Looks like that worked! Great! Let's dig in. 

## 1. Create Tempview

Go ahead and create a tempview of the dataframe so we can work with it in spark sql.

In [6]:
# Create a temporary view of the dataframe
df.createOrReplaceTempView("tempview")

## 2. Select all apps with ratings above 4.1

Use your tempview to select all apps with ratings above 4.1

In [8]:
# Then Query the temp view
spark.sql("SELECT * FROM tempview WHERE Rating > 4.1").limit(5).toPandas()

Unnamed: 0,App,Category,Rating,Reviews,Size,Installs,Type,Price,Content Rating,Genres,Last Updated,Current Ver,Android Ver
0,"U Launcher Lite – FREE Live Cool Themes, Hide ...",ART_AND_DESIGN,4.7,87510,8.7M,"5,000,000+",Free,0,Everyone,Art & Design,"August 1, 2018",1.2.4,4.0.3 and up
1,Sketch - Draw & Paint,ART_AND_DESIGN,4.5,215644,25M,"50,000,000+",Free,0,Teen,Art & Design,"June 8, 2018",Varies with device,4.2 and up
2,Pixel Draw - Number Art Coloring Book,ART_AND_DESIGN,4.3,967,2.8M,"100,000+",Free,0,Everyone,Art & Design;Creativity,"June 20, 2018",1.1,4.4 and up
3,Paper flowers instructions,ART_AND_DESIGN,4.4,167,5.6M,"50,000+",Free,0,Everyone,Art & Design,"March 26, 2017",1.0,2.3 and up
4,Garden Coloring Book,ART_AND_DESIGN,4.4,13791,33M,"1,000,000+",Free,0,Everyone,Art & Design,"September 20, 2017",2.9.2,3.0 and up


## 3. Now pass your results to an object (ie create a spark dataframe)

Select just the App and Rating column where the Category is in the Comic category and the Rating is above 4.5.

In [9]:
# Or pass it to an object
sql_results = spark.sql("SELECT App,Rating FROM tempview WHERE Category = 'COMICS' AND Rating > 4.5")
sql_results.limit(5).toPandas()

Unnamed: 0,App,Rating
0,Manga Master - Best manga & comic reader,4.6
1,GANMA! - All original stories free of charge f...,4.7
2,Röhrich Werner Soundboard,4.7
3,Unicorn Pokez - Color By Number,4.8
4,Manga - read Thai translation,4.6


## 4. Which category has the most cumulative reviews

Only select the one category with the most reivews. 

*Note: will require adding all the review together for each category*

In [7]:
spark.sql("SELECT Category, sum(Reviews) AS Total_Reviews FROM tempview GROUP BY Category ORDER BY Total_Reviews DESC").limit(1).toPandas()

Unnamed: 0,Category,Total_Reviews
0,GAME,1585422349


## 5. Which App has the most reviews?

Display ONLY the top result

Include only the App column and the Reviews column.

In [11]:
spark.sql("SELECT App, Reviews FROM tempview ORDER BY Reviews DESC").show(1)

+--------+--------+
|     App| Reviews|
+--------+--------+
|Facebook|78158306|
+--------+--------+
only showing top 1 row



## 5. Select all apps that contain the word 'dating' anywhere in the title

*Note: we did not cover this in the lecture. You'll have to use your SQL knowledge :) Google it if you need to.*

In [12]:
spark.sql("SELECT * FROM tempview WHERE App LIKE '%dating%'").limit(5).toPandas()

Unnamed: 0,App,Category,Rating,Reviews,Size,Installs,Type,Price,Content Rating,Genres,Last Updated,Current Ver,Android Ver
0,"Meet, chat & date. Free dating app - Chocolate...",DATING,3.9,8661,9.5M,"1,000,000+",Free,0,Mature 17+,Dating,"April 3, 2018",0.1.11,4.0 and up
1,Friend Find: free chat + flirt dating app,DATING,,23,11M,100+,Free,0,Mature 17+,Dating,"July 31, 2018",1.0,4.4 and up
2,Spine- The dating app,DATING,5.0,5,9.3M,500+,Free,0,Teen,Dating,"July 14, 2018",4.0,4.0.3 and up
3,Princess Closet : Otome games free dating sim,FAMILY,4.5,29495,56M,"1,000,000+",Free,0,Teen,Simulation,"May 24, 2018",1.11.0,4.0.3 and up
4,happn – Local dating app,LIFESTYLE,4.3,1118201,Varies with device,"10,000,000+",Free,0,Mature 17+,Lifestyle,"July 24, 2018",Varies with device,Varies with device


## 6. Use SQL Transformer to display how many free apps there are in this list

In [13]:
# First we need to import SQL transformer
from pyspark.ml.feature import SQLTransformer

In [14]:
sqlTrans = SQLTransformer(
    statement="SELECT count(*) FROM __THIS__ WHERE Type = 'Free'") 
sqlTrans.transform(df).show()

+--------+
|count(1)|
+--------+
|   10037|
+--------+



## 7. What is the most popular Genre?

Which genre appears most often in the dataframe. Show only the top result.

In [15]:
sqlTrans = SQLTransformer(
    statement="SELECT Genres, count(*) as Total FROM __THIS__ GROUP BY Genres ORDER BY Total DESC") 
sqlTrans.transform(df).show(1)

+------+-----+
|Genres|Total|
+------+-----+
| Tools|  842|
+------+-----+
only showing top 1 row



## 8. Select all the apps in the 'Tools' genre that have more than 100 reviews

In [16]:
sqlTrans = SQLTransformer(
    statement="SELECT App, Reviews FROM __THIS__ WHERE Genres = 'Tools' AND Reviews > 100") 
sqlTrans.transform(df).show(10)

+--------------------+--------+
|                 App| Reviews|
+--------------------+--------+
|   Moto File Manager|   38655|
|              Google| 8033493|
|    Google Translate| 5745093|
|        Moto Display|   18239|
|      Motorola Alert|   24199|
|     Motorola Assist|   37333|
|Cache Cleaner-DU ...|12759663|
|  Moto Suggestions ™|     308|
|          Moto Voice|   33216|
|          Calculator|   40770|
+--------------------+--------+
only showing top 10 rows

