<div  align='center'><img src='https://s3.amazonaws.com/weclouddata/images/logos/wcd_logo_new_2.png' width='30%'></div >

<p style="font-size:20px;text-align:center"><b><font color='#F39A54'>Data Engineering Diploma</font></b></p>

<h2 align='center'> WeCloudData Data Engineer Spark Exercise 1 </h2>

<br>

Please download data from [HERE](s3://weclouddata/data/data/pyspark_exercises_data.zip)

# Section 1. Reading, Writing and Validating Data in PySpark HW Solutions



In [0]:
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("ReadWriteVal").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

## Next let's start by reading a basic csv dataset

In [0]:
path ="Datasets/"

# Some csv data
pga = spark.read.csv(path+'pga_tour_historical.csv',inferSchema=True,header=True)

## S1.1. View first 5 lines of dataframe
First generate a view of the first 5 lines of the dataframe to get an idea of what is inside. We went over two ways of doing this... see if you can remember BOTH ways. 

In [0]:
pga.show(3)

In [0]:
# I prefer this method
pga.limit(5).toPandas()

## S1.2. Print the schema details

Now print the details of the dataframes schema that Spark infered to ensure that it was infered correctly. Sometimes it is not infered correctly, so we need to watch out!

In [0]:
print(pga.printSchema())
print("")
print(pga.columns)
print("")
# Not so fond of this method, but to each their own
print(pga.describe())

## S1.3. Edit the schema during the read in


In [0]:
from pyspark.sql.types import StructField,StringType,IntegerType,StructType

In [0]:
data_schema = [StructField("Player Name", StringType(), True), \
               StructField("Season", IntegerType(), True), \
               StructField("Statistic", StringType(), True), \
               StructField("Variable", StringType(), True), \
               StructField("Value", IntegerType(), True)]

In [0]:
final_struc = StructType(fields=data_schema)

In [0]:
path ="Datasets/"
pga = spark.read.csv(path+'pga_tour_historical.csv', schema=final_struc)

In [0]:
pga.printSchema()
# That's better!

## S1.4. Generate summary statistics for only one variable


In [0]:
# Neat "describe" function
pga.describe(['Value']).show()

## S1.5. Generate summary statistics for TWO variables


In [0]:
pga.select("Season", "Value").summary("count", "min", "max").show()

## S1.6. Write a parquet file

In [0]:
df = pga.select("Season","Value")
df.write.mode("overwrite").parquet("partition_parquet/")

## S1.7. Write a partitioned parquet file


In [0]:
df.write.mode("overwrite").partitionBy("Season").parquet("partitioned_parquet/")
df.show(5)

## S1.8. Read in a partitioned parquet file

Now try reading in the partitioned parquet file you just created above. 

In [0]:
path = "partitioned_parquet/" #Note: if you add a * to the end of the path, the Season var will be automatically dropped
parquet = spark.read.parquet(path)
        
parquet.show()

## S1.9. Reading in a set of paritioned parquet files

Now try only reading Seasons 2010, 2011 and 2012.

In [0]:
# Notice that this method only gives you the "Value" column
path = "partitioned_parquet/"
partitioned = spark.read.parquet(path+'Season=2010/',\
                             path+'Season=2011/', \
                             path+'Season=2012/')

partitioned.show(5)

In [0]:
# We need to use this method to get the "Season" and "Value" Columns
path = "partitioned_parquet/"
dataframe = spark.read.option("basePath", path).parquet(path+'Season=2010/',\
                                                                path+'Season=2011/', \
                                                                path+'Season=2012/')
dataframe.show(5)

## S1.10. Create your own dataframe


In [0]:
values = [('Kyle',10,'A',1),('Melbourne',36,'A',1),('Nina',123,'A',1),('Stephen',48,'B',2),('Orphan',16,'B',2),('Imran',1,'B',2)]
df = spark.createDataFrame(values,['name','age','AB','Number'])
df.show()

# Section 2. Manipulating Data in DataFrames Solutions

In [0]:
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("Manip").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

## Read in our Republican vs. Democrats Tweet DataFrame

In [0]:
path='Datasets/'
tweets = spark.read.csv(path+'Rep_vs_Dem_tweets.csv',inferSchema=True,header=True)

In [0]:
tweets.limit(4).toPandas()

**Prevent Truncation of view**

If the view you produced above truncated some of the longer tweets, see if you can prevent that so you can read the whole tweet.

In [0]:
tweets.select("tweet").show(3,False)

As we can see, this dataset contains three columns. The tweet content, Twitter handle that tweeted the tweet, and the party that that tweet belongs to. But it looks like the tweets could use some cleaning, esspecially if we are going to this for some kind of machine learning analysis. Let's see if we can make this an even richer dataset using the techniques we learned in the lecture!

**Print Schema**

First, check the schema to make sure the datatypes are accurate. 

In [0]:
print(tweets.printSchema())

## S2.1. Can you identify any tweet that mentions the handle @LatinoLeader using regexp_extract?

It doesn't matter how you identify the row, any identifier will do. You can test your script on row 5 from this dataset. That row contains @LatinoLeader. 

In [0]:
from pyspark.sql.functions import * #regexp_extract
latino = tweets.withColumn('Latino_Mentions',regexp_extract(tweets.Tweet, '(.)(@LatinoLeader)(.)',2))
latino.limit(6).toPandas()

## S2.2. Replace any value other than 'Democrate' or 'Republican' with 'Other' in the Party column.

We can see from the output below, that there are several other values other than 'Democrate' or 'Republican' in the Part column. We are assuming that this is dirty data that needs to be cleaned up.

In [0]:
# We haven't gotten to this yet so it's a bit of a teaser :)
from pyspark.sql.functions import *
counts = tweets.groupBy("Party").count()
counts.orderBy(desc("count")).show(6)

In [0]:
from pyspark.sql.functions import when

clean = tweets.withColumn('Party', when(tweets.Party == 'Democrat', 'Democrat').when(tweets.Party == 'Republican', 'Republican').otherwise('Other'))
counts = clean.groupBy("Party").count()
counts.orderBy(desc("count")).show(16)

## S2.3. Delete all embedded links (ie. "https:....)

For example see the first row in the tweets dataframe. 

*Note: this may require an google search :)*

In [0]:
print("OG Tweet")
tweets.select("tweet").show(1,False)

In [0]:
# And here is the solution
print("Cleaned Tweet")
tweets.withColumn('cleaned', regexp_replace('Tweet', '(http|ftp|https)://([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:/~+#-]*[\w@?^=%&/~+#-])?', '')).select("cleaned").show(1,False)

## S2.4. Remove any leading or trailing white space in the tweet column

In [0]:
from pyspark.sql.functions import *
tweets.select("Tweet").show(5,False)
tweets.select('Tweet', trim(tweets.Tweet)).show(5,False)

## S2.5. Rename the 'Party' column to 'Dem_Rep'

No real reason here :) just wanted you to get practice doing this. 

In [0]:
renamed = tweets.withColumnRenamed('Party','Dem_Rep')
renamed.limit(4).toPandas()

## S2.6. Concatenate the Party and Handle columns

Silly yes... but good practice.

pyspark.sql.functions.concat_ws(sep, *cols)[source](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.concat_ws.html)

Concatenates multiple input string columns together into a single string column, using the given separator.

In [0]:
from pyspark.sql.functions import *
tweets.select(tweets.Party,tweets.Handle,concat_ws(' ', tweets.Party, tweets.Handle).alias('Concatenated')).show(5,False)

## Challenge Question

Let's image that we want to analyze the hashtags that are used in these tweets. Can you extract all the hashtags you see?

In [0]:
from pyspark.sql.functions import *
# Parenthesis are used to mark a subexpression within a larger expression
# The . matches any character other than a new line
# | means is like or
# \w+ means followed by any word
pattern = '(.|'')(#)(\w+)'
# * is used to match the preceding character zero or more times.
# ? will match the preceding character zero or one times, but no more.
# $ is used to match the ending position in a string. 
split_pattern = r'.*?({pattern})'.format(pattern=pattern)
end_pattern = r'(.*{pattern}).*?$'.format(pattern=pattern)

# $1 here means to capture the first part of the regex result
# The , will separate each find with a comma in the a array we create
df2 = tweets.withColumn('a', regexp_replace('Tweet', split_pattern, '$1,')).where(col('Tweet').like('%#%'))
df2.select('a').show(3,False)
# Remove all the other results that came up
df3 = df2.withColumn('a', regexp_replace('a', end_pattern, '$1'))
df3.select('a').show(3,False)
# Finally create an array from the result by splitting on the comma
df4 = df3.withColumn('a', split('a', r','))
df4.select('a').show(3,False)
df4.limit(3).toPandas()

## Let's create our own dataset to work with real dates

This is a dataset of patient visits from a medical office. It contains the patients first and last names, date of birth, and the dates of their first 3 visits. 

In [0]:
from pyspark.sql.types import *

md_office = [('Mohammed','Alfasy','1987-4-8','2016-1-7','2017-2-3','2018-3-2') \
            ,('Marcy','Wellmaker','1986-4-8','2015-1-7','2017-1-3','2018-1-2') \
            ,('Ginny','Ginger','1986-7-10','2014-8-7','2015-2-3','2016-3-2') \
            ,('Vijay','Doberson','1988-5-2','2016-1-7','2018-2-3','2018-3-2') \
            ,('Orhan','Gelicek','1987-5-11','2016-5-7','2017-1-3','2018-9-2') \
            ,('Sarah','Jones','1956-7-6','2016-4-7','2017-8-3','2018-10-2') \
            ,('John','Johnson','2017-10-12','2018-1-2','2018-10-3','2018-3-2') ]

df = spark.createDataFrame(md_office,['first_name','last_name','dob','visit1','visit2','visit3']) # schema=final_struc

# Check to make sure it worked
df.show()
print(df.printSchema())

Ooops! The dates are still stored as text... let's try converting them again and see if we have any issues this time.

In [0]:
# Covert the date columns into date types
df = df.withColumn("dob", df["dob"].cast(DateType())) \
        .withColumn("visit1", df["visit1"].cast(DateType())) \
        .withColumn("visit2", df["visit2"].cast(DateType())) \
        .withColumn("visit3", df["visit3"].cast(DateType()))

# Check to make sure it worked
df.show()
print(df.printSchema())

## S2.7. Can you calculate a variable showing the length of time between patient visits?

Compare visit1 to visit2 and visit2 to visit3 for all patients and see what the average length of time is between visits. Create an alias for it as well. 

In [0]:
from pyspark.sql.functions import *
diff1 = df.select(datediff(df.visit2, df.visit1).alias("diff"))
diff2 = df.select(datediff(df.visit3, df.visit2).alias("diff"))

# Append the two dataframes together
diff_combo = diff1.union(diff2)
diff_combo.show(5)

## S2.8. Can you calculate the age of each patient?

In [0]:
# We use the datediff function here as well
# And divide by 365 to get the age
# I also formated this value to get rid of all the decimal places
ages = df.select(format_number(datediff(df.visit1,df.dob)/365,1).alias("age"))
ages.show()

## S2.9. Can you extract the month from the first visit column and call it "Month"?

In [0]:
month1 = df.select(month(df['visit1']).alias("Month"))
month1.show(3)

In [0]:
# Bonus (not in lecture)
# If you wanted to make a list (or an array in this case) for all months, you could do this

df.select(array(month(df['visit1']),month(df['visit2'])).alias("Months")).show(3)

In [0]:
# Bonus (not in lecture)
# Or even a separate col for each month

df.select('*',month(df['visit1']).alias("Month1"),month(df['visit2']).alias("Month2")).show(3)

In [0]:
# Bonus (not in lecture)
# Or loop over all visit columns

# Get all visit column names
df_month_cols = [i for i in df.columns if i.startswith('visit')]

# Make a copy of our df
df2 = df

# Loop over relevant columns and add on month columns
for column in df_month_cols:
    # Find number of visit (this is straght up python, we don't need pyspark for this)
    num = str(column)[-1]
    # Create the naming convention for the new column  (python too)
    new_col_name = "Month" + num
    df2 = df2.withColumn(new_col_name,month(df[column]))
df2.show()

## S2.10. Challenges with working with date and timestamps

Let's read in our supermarket sales dataframe and see some of the issues that can come up when working with date and timestamps values.

In [0]:
path = 'Datasets/'
sales = spark.read.csv(path+'supermarket_sales.csv',inferSchema=True,header=True)

In [0]:
sales.limit(6).toPandas()

In [0]:
print(sales.printSchema())

Looks like we need to convert the date field into a date type. Let's go ahead and do that..

In [0]:
from pyspark.sql.types import *
# from pyspark.sql.functions import *

print("Note that this method gives all null values")
df = sales.withColumn("Formatted Date", sales["Date"].cast(DateType()))
df = df.select("Date","Formatted Date")
print(df.limit(6).toPandas())

print(" ")
print("This result gives the wrong results (notice that all months are january)")
sales.select('Date',to_date(sales.Date, 'm/d/yyyy').alias('Dateformatted'),month(to_date(sales.Date, 'm/d/yyyy')).alias('Month'),).show(5)

print(" ")
print("But if we capitalize the mm part in the format, we get the correct results!")
sales.select('Date',to_date(sales.Date, 'M/d/yyyy').alias('Dateformatted'),month(to_date(sales.Date, 'M/d/yyyy')).alias('Month'),).show(5)

## Another way we can extract the month value from the date field

If your date format is uncommon, or you are not getting the expected results from the output above, you could also use this method to get what you need. 

In [0]:
# We need to creative here
# First split the date field and get the month value 
df = sales.select('Date',split(sales.Date, '/')[0].alias('Month'),'Total')

# Verify everything worked correctly
print("Verify")
df.show(5)
print(df.printSchema())

## S2.11.0 Working with Arrays

Here is a dataframe of reviews from the movie the Dark Night.

In [0]:
from pyspark.sql.functions import *

values = [(5,'Epic. This is the best movie I have EVER seen'), \
          (4,'Pretty good, but I would have liked to seen better special effects'), \
          (3,'So so. Casting could have been improved'), \
          (5,'The most EPIC movie of the year! Casting was awesome. Special effects were so intense.'), \
          (4,'Solid but I would have liked to see more of the love story'), \
          (5,'THE BOMB!!!!!!!')]
reviews = spark.createDataFrame(values,['rating', 'review_txt'])

reviews.show(6,False)

## S2.11.1 Let's see if we can create an array off of the review text column and then derive some meaningful results from it.

**But first** we need to clean the rview_txt column to make sure we can get what we need from our analysis later on. So let's do the following:

1. Remove all punctuation
2. lower case everything
3. Remove white space (trim)
3. Then finally, split the string

In [0]:
# We can do 1-3 in one call here
df = reviews.withColumn("cleaned_reviews", trim(lower(regexp_replace(col('review_txt'),'[^\sa-zA-Z0-9]', ''))))
df.show(1,False)

In [0]:
# Then split on the spaces!
df = df.withColumn("review_txt_array", split(col("cleaned_reviews"), " "))
df.show(1,False)

## S2.11.2 Alright now let's see if we can find which reviews contain the word 'Epic'

In [0]:
epic = df.withColumn("result",array_contains(col("review_txt_array"), "epic"))
epic.toPandas()

# Section 3. Handling Missing Data in PySpark HW Solutions



In [0]:
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("nulls").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

## Read in the dataset for this Notebook

In [0]:
df = spark.read.csv('Datasets/Weather.csv',inferSchema=True,header=True)

## About this dataset

**New York City Taxi Trip - Hourly Weather Data**

Here is some detailed weather data for the New York City Taxi Trips.

**Source:** https://www.kaggle.com/meinertsen/new-york-city-taxi-trip-hourly-weather-data

### Print a view of the first several lines of the dataframe to see what our data looks like

In [0]:
df.limit(8).toPandas()

### Print the schema 

So that we can see if we need to make any corrections to the data types.

In [0]:
print(df.printSchema())

## S3.1. How much missing data are we working with?

Get a count and percentage of each variable in the dataset to answer this question.

In [0]:
from pyspark.sql.functions import *

def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows,(nullRows/numRows)*100
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_calc_list = null_value_calc(df)
spark.createDataFrame(null_columns_calc_list, ['Column_With_Null_Value', 'Null_Values_Count','Null_Value_Percent']).show()

## S3.2. How many rows contain at least one null value?

We want to know, if we use the df.ha option, how many rows will we loose. 

In [0]:
og_len = df.count()
drop_len = df.na.drop().count()
print("Total Rows in the DF: ",og_len)
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Yikes! Everything

## S3.3. Drop the missing data

Drop any row that contains missing data across the whole dataset

In [0]:
dropped = df.na.drop()
dropped.limit(4).toPandas() 

# Note this statement is equivilant to the above:
# zomato.na.drop(how='any').limit(4).toPandas() 

Yep, we have no more data :(

## S3.4. Drop with a threshold

Count how many rows would be dropped if we only dropped rows that had a least 12 NON-Null values

In [0]:
og_len = df.count()
drop_len = df.na.drop(thresh=12).count()
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

## S3.5. Drop rows according to specific column value

Now count how many rows would be dropped if you only drop rows whose values in the tempm column are null/NaN

In [0]:
og_len = df.count()
drop_len = df.na.drop(subset=["tempm"]).count() 
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

In [0]:
# Another way to do the above
og_len = df.count()
drop_len = df.filter(df.tempm.isNotNull()).count() 
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

## S3.6. Drop rows that are null accross all columns

Count how many rows would be dropped if you only dropped rows where ALL the values are null

In [0]:
og_len = df.count()
drop_len = df.na.drop(how='all').count() 
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

That's good news!

## S3.7. Fill in all the string columns missing values with the word "N/A"

Make sure you don't edit the df dataframe itself. Create a copy of the df then edit that one.

In [0]:
null_fill = df.na.fill('N/A')
null_fill.limit(4).toPandas()

## S3.8. Fill in NaN values with averages for the tempm and tempi columns

*Note: you will first need to compute the averages for each column and then fill in with the corresponding value.*

In [0]:
def fill_with_mean(df, include=set()): 
    stats = df.agg(*(
        avg(c).alias(c) for c in df.columns if c in include
    ))
#     stats = stats.select(*(col(c).cast("int").alias(c) for c in stats.columns)) #IntegerType()
    return df.na.fill(stats.first().asDict())

updated_df = fill_with_mean(df, ["tempm","tempi"])
updated_df.limit(5).toPandas()

<h2 align='center'><b><font color='#3F13E2'>That's it! Great Job! </font></b></h2>