# **AFRICAN INSTITUTE FOR MATHEMATICAL SCIENCE - RWANDA**

---
# **Big Data Analytics with Python**
## Assignment 1

### <span style="color:red">Note: all datasets where shared in the classroom</span>
---

# **Part 1: Reading, Writing and Validating Data in PySpark HW**

Welcome to your first coding homework assignment in PySpark! I hope you enjoyed the lecture on Reading, Writing and Validating dataframes. Now it's time to put what you've learned into action! 

I've included several instructions below to help guide you through this homework assignment which I hope will get you feeling even comfortable reading, writing and validating dataframes. If you get stuck at any point, feel free to jump to the next lecture where I will guide you through my solutions to the HW assignment. 

Have fun!

Let's dig right in!


## But first things first.....
We need to always begin every Spark session by creating a Spark instance. Let's go ahead and use the method we learned in the lecture in the cell below. Also see if you can remember how to open the Spark UI (using a link that automatically guides you there). 

In [1]:
import os
print(os.getcwd())

/home/guest/Desktop/BigData/V-bigdata/Datasets/Assignments


In [5]:

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("Part_1.").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
appid = spark._jsc.sc().applicationId()
spark

25/11/15 19:26:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Next let's start by reading a basic csv dataset

Download the pga_tour_historical dataset that is attached to this lecture and save it whatever folder you want, then read it in. 

**Data Source:** https://www.kaggle.com/bradklassen/pga-tour-20102018-data

Rememer to try letting Spark infer the header and infer the Schema types!

In [6]:
# path ='../Day1/'
pga = spark.read.csv('pga_tour_historical.csv', header = True, inferSchema = True)

                                                                                

## 1. View first 5 lines of dataframe
First generate a view of the first 5 lines of the dataframe to get an idea of what is inside. We went over two ways of doing this... see if you can remember BOTH ways. 

In [7]:
pga.show(5)

+---------------+------+----------------+--------------------+-----+
|    Player Name|Season|       Statistic|            Variable|Value|
+---------------+------+----------------+--------------------+-----+
|Robert Garrigus|  2010|Driving Distance|Driving Distance ...|   71|
|   Bubba Watson|  2010|Driving Distance|Driving Distance ...|   77|
| Dustin Johnson|  2010|Driving Distance|Driving Distance ...|   83|
|Brett Wetterich|  2010|Driving Distance|Driving Distance ...|   54|
|    J.B. Holmes|  2010|Driving Distance|Driving Distance ...|  100|
+---------------+------+----------------+--------------------+-----+
only showing top 5 rows


## 2. Print the schema details

Now print the details of the dataframes schema that Spark infered to ensure that it was infered correctly. Sometimes it is not infered correctly, so we need to watch out!

In [8]:
pga.printSchema()

root
 |-- Player Name: string (nullable = true)
 |-- Season: integer (nullable = true)
 |-- Statistic: string (nullable = true)
 |-- Variable: string (nullable = true)
 |-- Value: string (nullable = true)



## 3. Edit the schema during the read in

We can see from the output above that Spark did not correctly infer that the "value" column was an integer value. Let's try specifying the schema this time to let spark know what the schema should be.

Here is a link to see a list of PySpark data types in case you need it (also attached to the lecture): 
https://spark.apache.org/docs/latest/sql-ref-datatypes.html

In [11]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

# Define the complete schema
data_schema = StructType([
    StructField("Player Name", StringType(), True),
    StructField("Season", IntegerType(), True),
    StructField("Statistic", StringType(), True),
    StructField("Variable", StringType(), True),
    StructField("Value", IntegerType(), True)  # Changed to IntegerType
])

# Read with the defined schema
pga = spark.read.csv('pga_tour_historical.csv', header=True, schema=data_schema)

# Verify the schema
pga.printSchema()

root
 |-- Player Name: string (nullable = true)
 |-- Season: integer (nullable = true)
 |-- Statistic: string (nullable = true)
 |-- Variable: string (nullable = true)
 |-- Value: integer (nullable = true)



## 4. Generate summary statistics for only one variable

See if you can generate summary statistics for only the "Value" column using the .describe function

(count, mean, stddev, min, max) 

In [12]:
from pyspark.sql.functions import regexp_replace, col
pga.describe('Value').show()

+-------+------------------+
|summary|             Value|
+-------+------------------+
|  count|           1657247|
|   mean|12494.388998743096|
| stddev|157274.75673570696|
|    min|              -178|
|    max|           3564954|
+-------+------------------+



## 5. Generate summary statistics for TWO variables
Now try to generate ONLY the count min and max for BOTH the "Value" and "Season" variable using the select. You can't use the .describe function for this one but see if you can remember which function you CAN use. 

In [18]:
pga.select("Season", "Value").summary("count", "min", "max").show()

+-------+-------+-------+
|summary| Season|  Value|
+-------+-------+-------+
|  count|2740403|1657247|
|    min|   2010|   -178|
|    max|   2018|3564954|
+-------+-------+-------+



## 6. Write a parquet file

Now try writing a parquet file (not partitioned) from the pga dataset. But first create a new dataframe containing ONLY the the "Season" and "Value" fields (using the "select command you used in the question above) and write a parquet file partitioned by "Season". This is a bit of a challenge aimed at getting you ready for material that will be covered later on in the course. Don't feel bad if you can't figure it out.

*Note that if any of your variable names contain spaces, spark will produce an error message with this call. That is why we are selecting ONLY the "Season" and "Value" fields. Ideally we should renamed those columns but we haven't gotten to that yet in this course but we will soon!*

In [21]:
# First, create a new dataframe with only "Season" and "Value" fields
pga_subset = pga.select("Season", "Value")

# Write as parquet file partitioned by "Season"
pga_subset.write.mode("overwrite").parquet("pga_season_partition/")

25/11/15 19:40:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/15 19:40:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/11/15 19:40:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/11/15 19:40:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/11/15 19:40:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/11/15 19:40:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
25/11/15 19:40:31 WARN MemoryManager: Total allocation exceeds 95.

## 7. Write a partioned parquet file

You will need to use the same limited dataframe that you created in the previous question to accomplish this task as well. 

In [None]:

pga_subset.write.mode("overwrite").partitionBy("Season").parquet("pga_season_partitioned/")

Py4JJavaError: An error occurred while calling o1863.parquet.
: org.apache.spark.SparkException: [INTERNAL_ERROR] Eagerly executed command failed. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:643)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:656)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:192)
	at org.apache.spark.sql.classic.DataFrameWriter.runCommand(DataFrameWriter.scala:622)
	at org.apache.spark.sql.classic.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.classic.DataFrameWriter.saveInternal(DataFrameWriter.scala:241)
	at org.apache.spark.sql.classic.DataFrameWriter.save(DataFrameWriter.scala:118)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:369)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
		at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:643)
		at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:656)
		at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
		at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
		at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
		at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
		at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
		at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
		at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
		at scala.util.Try$.apply(Try.scala:217)
		at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
		at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
		at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
		... 20 more
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.classic.SparkSession.sessionState()" because "sparkSession" is null
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:86)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:117)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:115)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:129)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
	at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
	... 20 more


## 8. Read in a partitioned parquet file

Now try reading in the partitioned parquet file you just created above. 

In [24]:
path = "pga_season_partitioned/"

my_parquet = spark.read.parquet(path)
my_parquet.show()

+-----+------+
|Value|Season|
+-----+------+
|   71|  2010|
|   77|  2010|
|   83|  2010|
|   54|  2010|
|  100|  2010|
|   63|  2010|
|   88|  2010|
|   64|  2010|
|   64|  2010|
|   92|  2010|
|   75|  2010|
|   54|  2010|
|   76|  2010|
|   94|  2010|
|   82|  2010|
|   85|  2010|
|   79|  2010|
|   89|  2010|
|   88|  2010|
|   91|  2010|
+-----+------+
only showing top 20 rows


## 9. Reading in a set of paritioned parquet files

Now try only reading Seasons 2010, 2011 and 2012.

In [25]:
path = "pga_season_partitioned/"

my_part =  spark.read.option("basePath", path).parquet(path + "Season=2010", path + "Season=2011", path + "Season=2012")
my_part.show()

+-----+------+
|Value|Season|
+-----+------+
|   71|  2010|
|   77|  2010|
|   83|  2010|
|   54|  2010|
|  100|  2010|
|   63|  2010|
|   88|  2010|
|   64|  2010|
|   64|  2010|
|   92|  2010|
|   75|  2010|
|   54|  2010|
|   76|  2010|
|   94|  2010|
|   82|  2010|
|   85|  2010|
|   79|  2010|
|   89|  2010|
|   88|  2010|
|   91|  2010|
+-----+------+
only showing top 20 rows


## 10. Create your own dataframe

Try creating your own dataframe below using PySparks *.createDataFrame* function. See if you can make one that contains 4 variables and at least 3 rows. 

Let's see how creative you can get on the content of the dataframe :)

In [155]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

student_df = [
    ("Deborah Aunga", "Gaming", 315, 72.5),
    ("Bob Marley", "Singing", 77, 68.2),
    ("Shawn Menendes", "Coding", 185, 34.8),
    ("Michael Schofield", "Driving", 300, 69.8),
    ("Jun Liu", "Dancing", 30, 77.7)
]

schema = StructType([
    StructField("Player_Name", StringType(), True),
    StructField("Best_Skill", StringType(), True),
    StructField("Avg_Distance_Yards", IntegerType(), True),
    StructField("Scoring_Average", DoubleType(), True)
])

custom_student_df = spark.createDataFrame(data=golf_data, schema=schema)


custom_student_df.show()

+-----------------+----------+------------------+---------------+
|      Player_Name|Best_Skill|Avg_Distance_Yards|Scoring_Average|
+-----------------+----------+------------------+---------------+
|      Tiger Woods|   Driving|               315|           72.5|
|     Rory McIlroy|   Putting|                28|           68.2|
|         Jon Rahm| Iron Play|               185|           70.1|
|Scottie Scheffler|   Driving|               305|           69.8|
|         Lydia Ko|   Putting|                26|           67.9|
+-----------------+----------+------------------+---------------+



In [28]:
spark.stop()

# **Part 2: Manipulating Data in DataFrames HW**


#### Let's get started applying what we learned in the lecure!

I've provided several questions below to help test and expand you knowledge from the code along lecture. So let's see what you've got!

First create your spark instance as we need to do at the start of every project.

In [1]:

from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("Part_2.").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
appid = spark._jsc.sc().applicationId()
spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/17 08:13:16 WARN Utils: Your hostname, aimsit, resolves to a loopback address: 127.0.1.1; using 10.6.248.147 instead (on interface wlp0s20f3)
25/11/17 08:13:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/17 08:13:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Read in our Republican vs. Democrats Tweet DataFrame

Attached to the lecture

In [3]:
Tweetdf = spark.read.csv("Rep_vs_Dem_tweets.csv",header = True, inferSchema=True)

## About this dataframe

Extracted tweets from all of the representatives (latest 200 as of May 17th 2018)

**Source:** https://www.kaggle.com/kapastor/democratvsrepublicantweets#ExtractedTweets.csv

Use either .show() or .toPandas() check out the first view rows of the dataframe to get an idea of what we are working with.

In [4]:
Tweetdf.toPandas()

Unnamed: 0,Party,Handle,Tweet
0,Democrat,RepDarrenSoto,"Today, Senate Dems vote to #SaveTheInternet. P..."
1,Democrat,RepDarrenSoto,RT @WinterHavenSun: Winter Haven resident / Al...
2,Democrat,RepDarrenSoto,RT @NBCLatino: .@RepDarrenSoto noted that Hurr...
3,"Congress has allocated about $18‚Ä¶""",,
4,Democrat,RepDarrenSoto,RT @NALCABPolicy: Meeting with @RepDarrenSoto ...
...,...,...,...
92484,Republican,RepTomPrice,Check out my op-ed on need for End Executive O...
92485,Republican,RepTomPrice,"Yesterday, Betty &amp; I had a great time lear..."
92486,Republican,RepTomPrice,We are forever grateful for the service and sa...
92487,Republican,RepTomPrice,Happy first day of school @CobbSchools! #CobbB...


**Prevent Truncation of view**

If the view you produced above truncated some of the longer tweets, see if you can prevent that so you can read the whole tweet.

In [5]:
Tweetdf.select('Tweet').show(20, truncate = False)

+--------------------------------------------------------------------------------------------------------------------------------------------+
|Tweet                                                                                                                                       |
+--------------------------------------------------------------------------------------------------------------------------------------------+
|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House‚Ä¶ https://t.co/n3tggDLU1L |
|RT @WinterHavenSun: Winter Haven resident / Alta Vista teacher is one of several recognized by @RepDarrenSoto for National Teacher Apprecia‚Ä¶|
|RT @NBCLatino: .@RepDarrenSoto noted that Hurricane Maria has left approximately $90 billion in damages.                                    |
|NULL                                                                                                                                     

**Print Schema**

First, check the schema to make sure the datatypes are accurate. 

In [7]:

Tweetdf.printSchema()

root
 |-- Party: string (nullable = true)
 |-- Handle: string (nullable = true)
 |-- Tweet: string (nullable = true)



## 1. Can you identify any tweet that mentions the handle @LatinoLeader using regexp_extract?

It doesn't matter how you identify the row, any identifier will do. You can test your script on row 5 from this dataset. That row contains @LatinoLeader. 

In [21]:
from pyspark.sql.functions import regexp_extract

Reg_with_handle = "@LatinoLeader"

result = Tweetdf.withColumn("is_leader_mention",regexp_extract("Tweet", "@LatinoLeader", 0)).filter("is_leader_mention != ''")

result.show(truncate=False)

+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+-----------------+
|Party   |Handle       |Tweet                                                                                                                                       |is_leader_mention|
+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+-----------------+
|Democrat|RepDarrenSoto|RT @NALCABPolicy: Meeting with @RepDarrenSoto . Thanks for taking the time to meet with @LatinoLeader ED Marucci Guzman. #NALCABPolicy2018.‚Ä¶|@LatinoLeader    |
+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+-----------------+



In [20]:
from pyspark.sql.functions import*
Reg_with_handle = "@LatinoLeader"
result= Tweetdf.withColumn("is_leader_mentioned", regexp_extract("Tweet",Reg_with_handle,0)).filter("is_leader_mentioned !=''")
result.show(truncate=False)

+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|Party   |Handle       |Tweet                                                                                                                                       |is_leader_mentioned|
+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|Democrat|RepDarrenSoto|RT @NALCABPolicy: Meeting with @RepDarrenSoto . Thanks for taking the time to meet with @LatinoLeader ED Marucci Guzman. #NALCABPolicy2018.‚Ä¶|@LatinoLeader      |
+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------------+



## 2. Replace any value other than 'Democrate' or 'Republican' with 'Other' in the Party column.

We can see from the output below, that there are several other values other than 'Democrate' or 'Republican' in the Part column. We are assuming that this is dirty data that needs to be cleaned up.

In [56]:
from pyspark.sql.functions import col, when

Tweetdf_replace = Tweetdf.withColumn("Party",when(col("Party").isin("Democrat", "Republican"), col("Party")).otherwise("Other"))

Tweetdf_replace.show(truncate=False)

+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|Party   |Handle       |Tweet                                                                                                                                       |
+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|Democrat|RepDarrenSoto|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House‚Ä¶ https://t.co/n3tggDLU1L |
|Democrat|RepDarrenSoto|RT @WinterHavenSun: Winter Haven resident / Alta Vista teacher is one of several recognized by @RepDarrenSoto for National Teacher Apprecia‚Ä¶|
|Democrat|RepDarrenSoto|RT @NBCLatino: .@RepDarrenSoto noted that Hurricane Maria has left approximately $90 billion in damages.                                    |


In [27]:
Tweetdf_replace = Tweetdf.withColumn("Party", when(col("Party").isin("Democrat","Republican"),col("Party")).otherwise("Other"))
Tweetdf_replace.show()

+--------+-------------+--------------------+
|   Party|       Handle|               Tweet|
+--------+-------------+--------------------+
|Democrat|RepDarrenSoto|Today, Senate Dem...|
|Democrat|RepDarrenSoto|RT @WinterHavenSu...|
|Democrat|RepDarrenSoto|RT @NBCLatino: .@...|
|   Other|         NULL|                NULL|
|Democrat|RepDarrenSoto|RT @NALCABPolicy:...|
|Democrat|RepDarrenSoto|RT @Vegalteno: Hu...|
|Democrat|RepDarrenSoto|RT @EmgageActionF...|
|Democrat|RepDarrenSoto|Hurricane Maria l...|
|Democrat|RepDarrenSoto|RT @Tharryry: I a...|
|Democrat|RepDarrenSoto|RT @HispanicCaucu...|
|Democrat|RepDarrenSoto|RT @RepStephMurph...|
|Democrat|RepDarrenSoto|RT @AllSaints_FL:...|
|Democrat|RepDarrenSoto|.@realDonaldTrump...|
|Democrat|RepDarrenSoto|Thank you to my m...|
|Democrat|RepDarrenSoto|We paid our respe...|
|   Other|         NULL|                NULL|
|Democrat|RepDarrenSoto|RT @WinterHavenSu...|
|Democrat|RepDarrenSoto|Meet 12 incredibl...|
|Democrat|RepDarrenSoto|RT @wildli

In [57]:
Tweetdf_replace.select("Party").distinct().show()

+----------+
|     Party|
+----------+
|  Democrat|
|     Other|
|Republican|
+----------+



In [30]:
import pyspark.sql.functions as f
Tweetdf_replace.select("Handle", f.translate(f.col("Handle"),"Soto","").alias("Soto")).show()

+-------------+---------+
|       Handle|     Soto|
+-------------+---------+
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|         NULL|     NULL|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|         NULL|     NULL|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
|RepDarrenSoto|RepDarren|
+-------------+---------+
only showing top 20 rows


## 3. Delete all embedded links (ie. "https:....)

For example see the first row in the tweets dataframe. 

*Note: this may require an google search :)*

In [58]:
Tweetdf_no_links = Tweetdf_replace.withColumn("Tweet",regexp_replace(col("Tweet"),r"http\S+", ""))
Tweetdf_no_links.show(truncate = False)


+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|Party   |Handle       |Tweet                                                                                                                                       |
+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|Democrat|RepDarrenSoto|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House‚Ä¶                         |
|Democrat|RepDarrenSoto|RT @WinterHavenSun: Winter Haven resident / Alta Vista teacher is one of several recognized by @RepDarrenSoto for National Teacher Apprecia‚Ä¶|
|Democrat|RepDarrenSoto|RT @NBCLatino: .@RepDarrenSoto noted that Hurricane Maria has left approximately $90 billion in damages.                                    |



## 4. Remove any leading or trailing white space in the tweet column

In [59]:
from pyspark.sql.functions import trim

Tweetdf_trimmed = Tweetdf_no_links.withColumn("Tweet",trim(col("Tweet")))

Tweetdf_trimmed.show(truncate=False)

+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|Party   |Handle       |Tweet                                                                                                                                       |
+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|Democrat|RepDarrenSoto|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House‚Ä¶                         |
|Democrat|RepDarrenSoto|RT @WinterHavenSun: Winter Haven resident / Alta Vista teacher is one of several recognized by @RepDarrenSoto for National Teacher Apprecia‚Ä¶|
|Democrat|RepDarrenSoto|RT @NBCLatino: .@RepDarrenSoto noted that Hurricane Maria has left approximately $90 billion in damages.                                    |


## 5. Rename the 'Party' column to 'Dem_Rep'

No real reason here :) just wanted you to get practice doing this. 

In [60]:
Tweetdf_renamed = Tweetdf_trimmed.withColumnRenamed("Party","Dem_Rep")
Tweetdf_renamed.show(truncate=False)

+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|Dem_Rep |Handle       |Tweet                                                                                                                                       |
+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|Democrat|RepDarrenSoto|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House‚Ä¶                         |
|Democrat|RepDarrenSoto|RT @WinterHavenSun: Winter Haven resident / Alta Vista teacher is one of several recognized by @RepDarrenSoto for National Teacher Apprecia‚Ä¶|
|Democrat|RepDarrenSoto|RT @NBCLatino: .@RepDarrenSoto noted that Hurricane Maria has left approximately $90 billion in damages.                                    |


## 6. Concatenate the Party and Handle columns

Silly yes... but good practice.

pyspark.sql.functions.concat_ws(sep, *cols)[source] <br>
Concatenates multiple input string columns together into a single string column, using the given separator.

In [61]:
from pyspark.sql.functions import concat_ws

Tweetdf_concat = Tweetdf_renamed.withColumn("Party_handle",concat_ws("_",col("Dem_Rep"),col("Handle")))

Tweetdf_concat.show(truncate = False)

+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
|Dem_Rep |Handle       |Tweet                                                                                                                                       |Party_handle          |
+--------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
|Democrat|RepDarrenSoto|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House‚Ä¶                         |Democrat_RepDarrenSoto|
|Democrat|RepDarrenSoto|RT @WinterHavenSun: Winter Haven resident / Alta Vista teacher is one of several recognized by @RepDarrenSoto for National Teacher Apprecia‚Ä¶|Democrat_RepDarrenSoto|
|Democrat|RepDarrenSoto|RT @NBCLatino: .@RepDarrenS

In [35]:
## LOwercasing

Tweetdf.withColumn("Tweet",lower(Tweetdf.Tweet))
Tweetdf.select("Tweet").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------+
|Tweet                                                                                                                                       |
+--------------------------------------------------------------------------------------------------------------------------------------------+
|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House‚Ä¶ https://t.co/n3tggDLU1L |
|RT @WinterHavenSun: Winter Haven resident / Alta Vista teacher is one of several recognized by @RepDarrenSoto for National Teacher Apprecia‚Ä¶|
|RT @NBCLatino: .@RepDarrenSoto noted that Hurricane Maria has left approximately $90 billion in damages.                                    |
|NULL                                                                                                                                     

## Challenge Question

Let's image that we want to analyze the hashtags that are used in these tweets. Can you extract all the hashtags you see?

In [62]:
from pyspark.sql.functions import split, explode

Tweetdf_all_hashtags = Tweetdf_concat.withColumn("Hashtags",explode(split(regexp_replace(col("Tweet"), r"[^#\w\s]", ""), " "))).filter(col("Hashtags").startswith("#"))

Tweetdf_all_hashtags.select("Tweet", "Hashtags").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------+------------------------+
|Tweet                                                                                                                                       |Hashtags                |
+--------------------------------------------------------------------------------------------------------------------------------------------+------------------------+
|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House‚Ä¶                         |#SaveTheInternet        |
|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House‚Ä¶                         |#NetNeutrality          |
|RT @NALCABPolicy: Meeting with @RepDarrenSoto . Thanks for taking the time to meet with @LatinoLeader ED Marucci Guzman. #NALCABPolicy2018.‚Ä¶|#NALCABPolic

# Let's create our own dataset to work with real dates

This is a dataset of patient visits from a medical office. It contains the patients first and last names, date of birth, and the dates of their first 3 visits. 

In [63]:
from pyspark.sql.types import *

md_office = [('Mohammed','Alfasy','1987-4-8','2016-1-7','2017-2-3','2018-3-2') \
            ,('Marcy','Wellmaker','1986-4-8','2015-1-7','2017-1-3','2018-1-2') \
            ,('Ginny','Ginger','1986-7-10','2014-8-7','2015-2-3','2016-3-2') \
            ,('Vijay','Doberson','1988-5-2','2016-1-7','2018-2-3','2018-3-2') \
            ,('Orhan','Gelicek','1987-5-11','2016-5-7','2017-1-3','2018-9-2') \
            ,('Sarah','Jones','1956-7-6','2016-4-7','2017-8-3','2018-10-2') \
            ,('John','Johnson','2017-10-12','2018-1-2','2018-10-3','2018-3-2') ]

df = spark.createDataFrame(md_office,['first_name','last_name','dob','visit1','visit2','visit3'])

df.show()
print(df.printSchema())

+----------+---------+----------+--------+---------+---------+
|first_name|last_name|       dob|  visit1|   visit2|   visit3|
+----------+---------+----------+--------+---------+---------+
|  Mohammed|   Alfasy|  1987-4-8|2016-1-7| 2017-2-3| 2018-3-2|
|     Marcy|Wellmaker|  1986-4-8|2015-1-7| 2017-1-3| 2018-1-2|
|     Ginny|   Ginger| 1986-7-10|2014-8-7| 2015-2-3| 2016-3-2|
|     Vijay| Doberson|  1988-5-2|2016-1-7| 2018-2-3| 2018-3-2|
|     Orhan|  Gelicek| 1987-5-11|2016-5-7| 2017-1-3| 2018-9-2|
|     Sarah|    Jones|  1956-7-6|2016-4-7| 2017-8-3|2018-10-2|
|      John|  Johnson|2017-10-12|2018-1-2|2018-10-3| 2018-3-2|
+----------+---------+----------+--------+---------+---------+

root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- visit1: string (nullable = true)
 |-- visit2: string (nullable = true)
 |-- visit3: string (nullable = true)

None


Oh no! The dates are still stored as text... let's try converting them again and see if we have any issues this time.

In [64]:
from pyspark.sql.functions import to_date

dates = ['dob', 'visit1', 'visit2', 'visit3']

for c in dates:
    df = df.withColumn(c, to_date(col(c), "yyyy-M-d"))

df.show(truncate=False)
df.printSchema()


+----------+---------+----------+----------+----------+----------+
|first_name|last_name|dob       |visit1    |visit2    |visit3    |
+----------+---------+----------+----------+----------+----------+
|Mohammed  |Alfasy   |1987-04-08|2016-01-07|2017-02-03|2018-03-02|
|Marcy     |Wellmaker|1986-04-08|2015-01-07|2017-01-03|2018-01-02|
|Ginny     |Ginger   |1986-07-10|2014-08-07|2015-02-03|2016-03-02|
|Vijay     |Doberson |1988-05-02|2016-01-07|2018-02-03|2018-03-02|
|Orhan     |Gelicek  |1987-05-11|2016-05-07|2017-01-03|2018-09-02|
|Sarah     |Jones    |1956-07-06|2016-04-07|2017-08-03|2018-10-02|
|John      |Johnson  |2017-10-12|2018-01-02|2018-10-03|2018-03-02|
+----------+---------+----------+----------+----------+----------+

root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- visit1: date (nullable = true)
 |-- visit2: date (nullable = true)
 |-- visit3: date (nullable = true)



## 7. Can you calculate a variable showing the length of time between patient visits?

Compare visit1 to visit2 and visit2 to visit3 for all patients and see what the average length of time is between visits. Create an alias for it as well. 

In [65]:
from pyspark.sql.functions import datediff

df_diff = df.withColumn("days_visit1_to_visit2", datediff(col("visit2"), col("visit1"))).withColumn("days_visit2_to_visit3",datediff(col("visit3"), col("visit2")))

df_diff.show(truncate=False)


+----------+---------+----------+----------+----------+----------+---------------------+---------------------+
|first_name|last_name|dob       |visit1    |visit2    |visit3    |days_visit1_to_visit2|days_visit2_to_visit3|
+----------+---------+----------+----------+----------+----------+---------------------+---------------------+
|Mohammed  |Alfasy   |1987-04-08|2016-01-07|2017-02-03|2018-03-02|393                  |392                  |
|Marcy     |Wellmaker|1986-04-08|2015-01-07|2017-01-03|2018-01-02|727                  |364                  |
|Ginny     |Ginger   |1986-07-10|2014-08-07|2015-02-03|2016-03-02|180                  |393                  |
|Vijay     |Doberson |1988-05-02|2016-01-07|2018-02-03|2018-03-02|758                  |27                   |
|Orhan     |Gelicek  |1987-05-11|2016-05-07|2017-01-03|2018-09-02|241                  |607                  |
|Sarah     |Jones    |1956-07-06|2016-04-07|2017-08-03|2018-10-02|483                  |425                  |
|

## 8. Can you calculate the age of each patient?

In [66]:
from pyspark.sql.functions import current_date, floor

df_age = df.withColumn("age",floor(datediff(current_date(), col("dob")) / 365))

df_age.select("first_name", "last_name", "dob", "age").show(truncate=False)


+----------+---------+----------+---+
|first_name|last_name|dob       |age|
+----------+---------+----------+---+
|Mohammed  |Alfasy   |1987-04-08|38 |
|Marcy     |Wellmaker|1986-04-08|39 |
|Ginny     |Ginger   |1986-07-10|39 |
|Vijay     |Doberson |1988-05-02|37 |
|Orhan     |Gelicek  |1987-05-11|38 |
|Sarah     |Jones    |1956-07-06|69 |
|John      |Johnson  |2017-10-12|8  |
+----------+---------+----------+---+



## 9. Can you extract the month from the first visit column and call it "Month"?

In [67]:
from pyspark.sql.functions import month

df_month = df.withColumn("Month",month(col("visit1")))

df_month.select("first_name", "visit1", "Month").show(truncate=False)


+----------+----------+-----+
|first_name|visit1    |Month|
+----------+----------+-----+
|Mohammed  |2016-01-07|1    |
|Marcy     |2015-01-07|1    |
|Ginny     |2014-08-07|8    |
|Vijay     |2016-01-07|1    |
|Orhan     |2016-05-07|5    |
|Sarah     |2016-04-07|4    |
|John      |2018-01-02|1    |
+----------+----------+-----+



## 10. Challenges with working with date and timestamps

Let's read in the supermarket sales dataframe attached to the lecture now and see some of the issues that can come up when working with date and timestamps values.

In [68]:
supermarket_df = spark.read.csv("supermarket_sales.csv", header=True, inferSchema=True)
supermarket_df.printSchema()
supermarket_df.show(5)


root
 |-- Invoice ID: string (nullable = true)
 |-- Branch: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Customer type: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Product line: string (nullable = true)
 |-- Unit price: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Tax 5%: double (nullable = true)
 |-- Total: double (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Payment: string (nullable = true)
 |-- cogs: double (nullable = true)
 |-- gross margin percentage: double (nullable = true)
 |-- gross income: double (nullable = true)
 |-- Rating: double (nullable = true)

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-------------------+-----------+------+-----------------------+------------+------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   

## About this dataset

The growth of supermarkets in most populated cities are increasing and market competitions are also high. The dataset is one of the historical sales of supermarket company which has recorded in 3 different branches for 3 months data. 

 - Attribute information
 - Invoice id: Computer generated sales slip invoice identification number
 - Branch: Branch of supercenter (3 branches are available identified by A, B and C).
 - City: Location of supercenters
 - Customer type: Type of customers, recorded by Members for customers using member card and Normal for without member card.
 - Gender: Gender type of customer
 - Product line: General item categorization groups - Electronic accessories, Fashion accessories, Food and beverages, Health and beauty, Home and lifestyle, Sports and travel
 - Unit price: Price of each product in USD
 - Quantity: Number of products purchased by customer
 - Tax: 5% tax fee for customer buying
 - Total: Total price including tax
 - Date: Date of purchase (Record available from January 2019 to March 2019)
 - Time: Purchase time (10am to 9pm)
 - Payment: Payment used by customer for purchase (3 methods are available ‚Äì Cash, Credit card and Ewallet)
 - COGS: Cost of goods sold
 - Gross margin percentage: Gross margin percentage
 - Gross income: Gross income
 - Rating: Customer stratification rating on their overall shopping experience (On a scale of 1 to 10)

**Source:** https://www.kaggle.com/aungpyaeap/supermarket-sales

### View dataframe and schema as usual

In [146]:
supermarket_df = spark.read.csv("supermarket_sales.csv", header=True, inferSchema=True)

supermarket_df.show(5)

supermarket_df.printSchema()


+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-------------------+-----------+------+-----------------------+------------+------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|     Date|               Time|    Payment|  cogs|gross margin percentage|gross income|Rating|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-------------------+-----------+------+-----------------------+------------+------+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|26.1415|548.9715| 1/5/2019|2025-11-15 13:08:00|    Ewallet|522.83|            4.761904762|     26.1415|   9.1|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|   3.82|   80.22| 3/8/2019|2025-11-15 10:29:00|       Cash|  76.4|            4.761904762|      

### Convert date field to date type

Looks like we need to convert the date field into a date type. Let's go ahead and do that..

In [147]:
supermarket_df.select("Date").show(6)

+---------+
|     Date|
+---------+
| 1/5/2019|
| 3/8/2019|
| 3/3/2019|
|1/27/2019|
| 2/8/2019|
|3/25/2019|
+---------+
only showing top 6 rows


In [148]:
supermarket_df.select("Date").show(5, truncate=False)


+---------+
|Date     |
+---------+
|1/5/2019 |
|3/8/2019 |
|3/3/2019 |
|1/27/2019|
|2/8/2019 |
+---------+
only showing top 5 rows


In [149]:
from pyspark.sql.functions import col, to_date, month

supermarket_df.select('Date',to_date(supermarket_df.Date,'M/d/yyyy').alias('NewDate'), month(to_date(supermarket_df.Date,'M/d/yyyy')).alias('Month'),).show(3)

+--------+----------+-----+
|    Date|   NewDate|Month|
+--------+----------+-----+
|1/5/2019|2019-01-05|    1|
|3/8/2019|2019-03-08|    3|
|3/3/2019|2019-03-03|    3|
+--------+----------+-----+
only showing top 3 rows


### How can we extract the month value from the date field?

If you had trouble converting the date field in the previous question think about a more creative solution to extract the month from that field.

In [150]:
from pyspark.sql.functions import split, col

# If date format is like "2023/01/15" or "01/15/2023"
supermarket_df = supermarket_df.withColumn("Month", split(col("Date"), "/")[0])

supermarket_df.select("Date", "Month").show(5)

+---------+-----+
|     Date|Month|
+---------+-----+
| 1/5/2019|    1|
| 3/8/2019|    3|
| 3/3/2019|    3|
|1/27/2019|    1|
| 2/8/2019|    2|
+---------+-----+
only showing top 5 rows


## 11.0 Working with Arrays

Here is a dataframe of reviews from the movie the Dark Night.

In [None]:
from pyspark.sql.functions import *

values = [(5,'Epic. This is the best movie I have EVER seen'), \
          (4,'Pretty good, but I would have liked to seen better special effects'), \
          (3,'So so. Casting could have been improved'), \
          (5,'The most EPIC movie of the year! Casting was awesome. Special effects were so intense.'), \
          (4,'Solid but I would have liked to see more of the love story'), \
          (5,'THE BOMB!!!!!!!')]
reviews = spark.createDataFrame(values,['rating', 'review_txt'])

reviews.show(6,False)

+------+--------------------------------------------------------------------------------------+
|rating|review_txt                                                                            |
+------+--------------------------------------------------------------------------------------+
|5     |Epic. This is the best movie I have EVER seen                                         |
|4     |Pretty good, but I would have liked to seen better special effects                    |
|3     |So so. Casting could have been improved                                               |
|5     |The most EPIC movie of the year! Casting was awesome. Special effects were so intense.|
|4     |Solid but I would have liked to see more of the love story                            |
|5     |THE BOMB!!!!!!!                                                                       |
+------+--------------------------------------------------------------------------------------+



## 11.1 Let's see if we can create an array off of the review text column and then derive some meaningful results from it.

**But first** we need to clean the rview_txt column to make sure we can get what we need from our analysis later on. So let's do the following:

1. Remove all punctuation
2. lower case everything
3. Remove white space (trim)
3. Then finally, split the string

In [None]:
from pyspark.sql.functions import regexp_replace, lower, split

reviews_clean = reviews.withColumn("clean_review",split(trim(lower(regexp_replace(col("review_txt"), r"[^\w\s]", "")))," "))

reviews_clean.show(truncate=False)


+------+--------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|rating|review_txt                                                                            |clean_review                                                                                       |
+------+--------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|5     |Epic. This is the best movie I have EVER seen                                         |[epic, this, is, the, best, movie, i, have, ever, seen]                                            |
|4     |Pretty good, but I would have liked to seen better special effects                    |[pretty, good, but, i, would, have, liked, to, seen, better, special, effects]                     |
|3     |So so. Casti

                                                                                

## 11.2 Alright now let's see if we can find which reviews contain the word 'Epic'

In [None]:
from pyspark.sql.functions import array_contains

epic_reviews = reviews_clean.filter(array_contains(col("clean_review"), "epic"))

epic_reviews.show(truncate=False)


+------+--------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|rating|review_txt                                                                            |clean_review                                                                                       |
+------+--------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|5     |Epic. This is the best movie I have EVER seen                                         |[epic, this, is, the, best, movie, i, have, ever, seen]                                            |
|5     |The most EPIC movie of the year! Casting was awesome. Special effects were so intense.|[the, most, epic, movie, of, the, year, casting, was, awesome, special, effects, were, so, intense]|
+------+------------

In [93]:
spark.stop()

# **Part 3: Joining and Appending DataFrames in PySpark HW**

Now it's time to test your knowledge and further engrain the concepts we touched on in the lectures. Let's go ahead and get started.




**As always let's start our Spark instance.**

In [94]:

from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("Part_3.").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
appid = spark._jsc.sc().applicationId()
spark

## Read in the database

Let cotinue working with our college courses dataframe to get some more insights and practice what we have learned!Let's read in the whole database using the loop function that we learned about in the lecture to automatically read in all the datasets from the uw-madision-courses folder (there are too many datasets to each one individually.

In [108]:
os.listdir('../Assignments')

['teachings.csv',
 'database.sqlite3',
 'subjects.csv',
 'sections.csv',
 'pga_season_partition',
 'instructors.csv',
 'rooms.csv',
 'Assignment1.ipynb',
 'Rep_vs_Dem_tweets.csv',
 'pga_season_partitioned',
 'pga_tour_historical.csv',
 'pga_season_partitioned.parquet',
 'schedules.csv',
 'subject_memberships.csv',
 'supermarket_sales.csv',
 'course_offerings.csv',
 'grade_distributions.csv',
 'courses.csv']

In [112]:
import os

path = "../Assignments/"

dataframes ={}

for filename in os.listdir(path):
    if filename.endswith('.csv'):
        filename_list = filename.split(".")
        df_name = filename_list[0]
        dataframes[df_name] = spark.read.csv(os.path.join(path, filename), header=True, inferSchema=True)

print(dataframes)        

{'teachings': DataFrame[instructor_id: int, section_uuid: string], 'subjects': DataFrame[code: string, name: string, abbreviation: string], 'sections': DataFrame[uuid: string, course_offering_uuid: string, section_type: string, number: int, room_uuid: string, schedule_uuid: string], 'instructors': DataFrame[id: int, name: string], 'rooms': DataFrame[uuid: string, facility_code: string, room_code: string], 'Rep_vs_Dem_tweets': DataFrame[Party: string, Handle: string, Tweet: string], 'pga_tour_historical': DataFrame[Player Name: string, Season: int, Statistic: string, Variable: string, Value: string], 'schedules': DataFrame[uuid: string, start_time: int, end_time: int, mon: boolean, tues: boolean, wed: boolean, thurs: boolean, fri: boolean, sat: boolean, sun: boolean], 'subject_memberships': DataFrame[subject_code: int, course_offering_uuid: string], 'supermarket_sales': DataFrame[Invoice ID: string, Branch: string, City: string, Customer type: string, Gender: string, Product line: strin

Now check the contents of a few of the dataframses that were read in above.

In [117]:
dataframes['courses'].printSchema()

dataframes['rooms'].show(3, truncate=False)

root
 |-- uuid: string (nullable = true)
 |-- name: string (nullable = true)
 |-- number: integer (nullable = true)

+------------------------------------+-------------+---------+
|uuid                                |facility_code|room_code|
+------------------------------------+-------------+---------+
|04368a56-c959-3e4b-8b3d-f4cc3538fea5|OFF CAMPUS   |null     |
|2cc50da3-ef0e-3572-a557-ca44930a0688|0032         |0249     |
|ebbf62b4-2ac3-356b-b0fa-7897f4446a17|0032         |B101     |
+------------------------------------+-------------+---------+
only showing top 3 rows


## Recap: About this database

You will notice that there are several more tables in the uw-madision-courses folder than there are read in above. This so that you will have a chance to practice your own custom joins and learn about the relationships between a real database work. Sometimes we don't know how they are related and we need to figure it out! I'll save that for the HW :) 

Here is a look at some of the important variables we can use to join our tables:

 - course_offerings: uuid, course_uuid, term_code, name
 - instructors: id, name
 - schedules: uuid
 - sections: uuid, course_offering_uuid,room_uuid, schedule_uuid
 - teachings: instructor_id, section_uuid
 - courses: uuid
 - grade_distributions: course_offering_uuid,section_number
 - rooms: uuid, facility_code, room_code
 - subjects: code
 - subject_memberships: subject_code, course_offering_uuid
 
 **Source:** https://www.kaggle.com/Madgrades/uw-madison-courses
 
So alright, let's use this information to discover some insights from this data!

## 1a. Can you assign the room numbers to each section of each course?

Show only the rooms uuid, facility code, room number, term code and the name of the course from the course_offerings table.

In [120]:
rooms = dataframes['rooms']
sections = dataframes['sections']
df_join_1 = rooms.join(sections, rooms.uuid == sections.room_uuid, how = 'left').select([rooms.uuid,rooms.facility_code,sections.course_offering_uuid,"number"])
df_join_1.limit(4).toPandas()

Unnamed: 0,uuid,facility_code,course_offering_uuid,number
0,ed828265-475b-31b4-b9a8-daec2a600449,32,a71789f0-09d9-3374-bd21-04a719a62af8,3
1,ed828265-475b-31b4-b9a8-daec2a600449,32,3c59e623-7e61-343a-951a-765cb62d3304,8
2,ed828265-475b-31b4-b9a8-daec2a600449,32,dd3caaea-05bf-383a-afa4-52297c455208,7
3,ed828265-475b-31b4-b9a8-daec2a600449,32,dd3caaea-05bf-383a-afa4-52297c455208,4


## 1b. Now show same output as above but for only facility number 0469 (facility_code)

In [121]:
df_facility_code = df_join_1.filter(df_join_1.facility_code == '0469')
df_facility_code.limit(4).toPandas()

Unnamed: 0,uuid,facility_code,course_offering_uuid,number
0,b7211db3-303f-3605-bead-237e1f663932,469,33d6238c-de54-3bb1-85e5-bb711ca51d79,1
1,b7211db3-303f-3605-bead-237e1f663932,469,a541beec-9660-3923-b500-e23939b7463a,1
2,b7211db3-303f-3605-bead-237e1f663932,469,a541beec-9660-3923-b500-e23939b7463a,1
3,b7211db3-303f-3605-bead-237e1f663932,469,7f4b103d-57dd-3340-8def-5d79e6ce7dbe,1


## 2. Count how many sections are offered for each subject for each facility

*Note: this will involve a groupby*

In [None]:
sections_offered = df_join_1.join(dataframes

{"ts": "2025-11-15 20:58:24.764", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `room_code` cannot be resolved. Did you mean one of the following? [`number`, `uuid`, `subject_code`, `facility_code`, `course_offering_uuid`]. SQLSTATE: 42703", "context": {"file": "jdk.internal.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)", "line": "", "fragment": "col", "errorClass": "UNRESOLVED_COLUMN.WITH_SUGGESTION"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o1492.count.\n: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `room_code` cannot be resolved. Did you mean one of the following? [`number`, `uuid`, `subject_code`, `facility_code`, `course_offering_uuid`]. SQLSTATE: 42703;\n'Aggregate [facility_code#2589, 'room_code], [facility_code#2589, 'room_code, count(1) 

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `room_code` cannot be resolved. Did you mean one of the following? [`number`, `uuid`, `subject_code`, `facility_code`, `course_offering_uuid`]. SQLSTATE: 42703;
'Aggregate [facility_code#2589, 'room_code], [facility_code#2589, 'room_code, count(1) AS count#2857L]
+- Join Inner, (course_offering_uuid#2547 = course_offering_uuid#2678)
   :- Project [uuid#2588, facility_code#2589, course_offering_uuid#2547, number#2549]
   :  +- Join LeftOuter, (uuid#2588 = room_uuid#2550)
   :     :- Relation [uuid#2588,facility_code#2589,room_code#2590] csv
   :     +- Relation [uuid#2546,course_offering_uuid#2547,section_type#2548,number#2549,room_uuid#2550,schedule_uuid#2551] csv
   +- Relation [subject_code#2677,course_offering_uuid#2678] csv


## 3. What are the hardest classes?

Let's see if we can figure out which classes are the hardest by seeing how many students failed. Note that you will first need to aggregate the grades table by the course uuid to include all sections. Show the name of the course as well that you will need to get from the course_offering table.

## Challenge Question: Automating data entry errors

We see in the dataframe below that there are several typos of various animal names. If this was a large database of several millions of records, correcting these errors would be way too labor intensive. How can we automate correcting these errors?

*Hint: Leven...*

In [None]:
values = [('Monkey',10),('Monkay',36),('Mnky',123), \
          ('Elephant',48),('Elefant',16),('Ellafant',1), \
          ('Hippopotamus',48),('Hipopotamus',16),('Hippo',1)]
zoo = spark.createDataFrame(values,['Animal','age'])
zoo.show()

                                                                                

+------------+---+
|      Animal|age|
+------------+---+
|      Monkey| 10|
|      Monkay| 36|
|        Mnky|123|
|    Elephant| 48|
|     Elefant| 16|
|    Ellafant|  1|
|Hippopotamus| 48|
| Hipopotamus| 16|
|       Hippo|  1|
+------------+---+



In [None]:

from pyspark.sql.functions import when, col
zoo_fixed = zoo.withColumn(
    'Animal',
    when(col('Animal') == 'Monkay', 'Monkey')
    .when(col('Animal') == 'Mnky', 'Monkey')
    .when(col('Animal') == 'Elefant', 'Elephant')
    .when(col('Animal') == 'Ellafant', 'Elephant')
    .when(col('Animal') == 'Hipopotamus', 'Hippopotamus')
    .when(col('Animal') == 'Hippo', 'Hippopotamus')
    .otherwise(col('Animal'))
)

zoo_fixed.show()

+------------+---+
|      Animal|age|
+------------+---+
|      Monkey| 10|
|      Monkey| 36|
|      Monkey|123|
|    Elephant| 48|
|    Elephant| 16|
|    Elephant|  1|
|Hippopotamus| 48|
|Hippopotamus| 16|
|Hippopotamus|  1|
+------------+---+

