# Spark 101 Exercise

In [None]:
# Import libraries
import pandas as pd
import numpy as np
from pydataset import data
from vega_datasets import data
from pyspark.sql.functions import col,min, max, sum, count, mean, avg,round,expr,concat,lit
from pyspark.sql.functions import regexp_extract, regexp_replace,when,month, year, quarter
spark = pyspark.sql.SparkSession.builder.getOrCreate()

# Ignre warnings
import warnings
warnings.filterwarnings('ignore')



# 1. Create a spark data frame that contains your favorite programming languages.

In [None]:
lang_list=('R','Python','JavaScrip','Java','Ruby','Rust','SQL','HTML','CSS')
language_df = pd.DataFrame(lang_list,columns=['language']
    
)

language_df.head()

In [None]:
df = spark.createDataFrame(language_df)

In [None]:
df.show()

In [None]:
df.printSchema()

In [None]:
df.count(),len(df.columns)

In [None]:
df.show(5)

# 2. Load the mpg dataset as a spark dataframe.

In [None]:
from pydataset import data
df = spark.createDataFrame(data('mpg'))
df

In [None]:
df.show(5)

### a.Create 1 column of output that contains a message like the one below:
The 1999 audi a4 has a 4 cylinder engine.

In [None]:
year_manufacturer_model = df.select('year', 'manufacturer', 'model').collect()[0]
cylinder = df.select('cyl').collect()[0][0]

print(f"The {year_manufacturer_model['year']} {year_manufacturer_model['manufacturer']} {year_manufacturer_model['model']} has a {cylinder} cylinder engine.")


In [None]:
df = df.withColumn(
    'engine_description',
    concat(
        lit("The "),
        df['year'],
        lit(" "),
        df['manufacturer'],
        lit(" "),
        df['model'],
        lit(" has a "),
        df['cyl'],
        lit(" cylinder engine.")
    )
)

df.show(truncate=False)

### For each vehicle.

b. Transform the trans column so that it only contains either manual or auto.

In [None]:
df.select(
    regexp_extract(df['trans'], r'^(manual|auto)', 1).alias('clean_trans')
).show(truncate=False)

In [None]:
df = df.withColumn(
    'engine_description',
    concat(
        lit("The "),
        df['year'],
        lit(" "),
        df['manufacturer'],
        lit(" "),
        df['model'],
        lit(" has a "),
        df['cyl'],
        lit(" cylinder engine.") )
)

df.show(truncate=False)

In [None]:
df = df.withColumn('clean_trans', regexp_extract(df['trans'], r'^(manual|auto)', 1))

df.show(truncate=False)

## 3. Load the tips dataset as a spark dataframe.

In [None]:
df = spark.createDataFrame(data('tips'))
df

In [None]:
df.show(5)

### a. What percentage of observations are smokers?

In [None]:
df.groupBy().agg((count(when(df.smoker == 'Yes', True)) / count('*') * 100)).show()

### b. Create a column that contains the tip percentage


In [None]:
df.groupBy().agg((count(df.tip) / count('total_bill'))).show()

In [None]:
total_tip = df.select(sum('tip')).first()[0]
df = df.withColumn('tip_percentage', (col('tip') / total_tip) * 100)

df.show()

### c. Calculate the average tip percentage for each combination of sex and smoker.

In [None]:
df.groupBy('sex', 'smoker').agg(avg('tip_percentage').alias('avg_tip_percentage')).show()

4. Use the seattle weather dataset referenced in the lesson to answer the questions below.
- Convert the temperatures to fahrenheit.
- Which month has the most rain, on average?
- Which year was the windiest?
- What is the most frequent type of weather in January?
- What is the average high and low temperature on sunny days in July in 2013 and 2014?
- What percentage of days were rainy in q3 of 2015?
- For each year, find what percentage of days it rained (had non-zero precipitation).

In [None]:
from vega_datasets import data
weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
df = spark.createDataFrame(weather)
df.show(6)

In [None]:
df = df.withColumn('temp_max', (col('temp_max') * 9/5) + 32)
df = df.withColumn('temp_min', (col('temp_min') * 9/5) + 32)

df.show(5)

In [None]:
rainy_df=df.withColumn('month', month(df['date'])).groupBy('month').agg(avg('precipitation').alias('avg_rainfall')).orderBy('avg_rainfall', ascending=False)

In [None]:
rdf=rainy_df.select('month').first()[0]
#NOVEMBER
print("The raniest month is:", rdf)

In [None]:
wind_df=df.withColumn('year', year(df['date'])).groupBy('year').agg(avg('wind').alias('avg_wind')).orderBy('avg_wind', ascending=False)

In [None]:
year_df=wind_df.select('year').first()[0]
print("The windiest year is:", year_df)

In [None]:
january_weather = df.filter(month(df['date']) == 1)
most_frequent_weather = january_weather.groupBy('weather').count().orderBy(col('count').desc()).first()[0]

print("The most frequent type of weather in January is:", most_frequent_weather)

In [None]:
from pyspark.sql.functions import month, year, avg
from pyspark.sql.types import IntegerType

In [None]:
df_filtered = df.filter((month(df['date']) == 7) & (year(df['date']).isin([2013, 2014])) & (col('weather') == 'sun'))

df_avg_temp = df_filtered.select(avg(col('temp_max')).cast(IntegerType()).alias('avg_high_temp'), avg(col('temp_min')).cast(IntegerType()).alias('avg_low_temp'))

avg_high_temp = df_avg_temp.select('avg_high_temp').first()[0]
avg_low_temp = df_avg_temp.select('avg_low_temp').first()[0]

print("Average high temperature on sunny days in July (2013 and 2014):", avg_high_temp)
print("Average low temperature on sunny days in July (2013 and 2014):", avg_low_temp)

In [None]:
q3_2015_rainy_days = df.filter((year(df['date']) == 2015) & (quarter(df['date']) == 3) & (col('weather') == 'rain'))
total_days_q3_2015 = df.filter((year(df['date']) == 2015) & (quarter(df['date']) == 3)).count()

rainy_days_percentage = (q3_2015_rainy_days.count() / total_days_q3_2015) * 100

print("Percentage of rainy days in Q3 2015:", rainy_days_percentage)

In [None]:
yearly_rain_days = df.filter(col('precipitation') > 0).groupBy(year('date')).agg((count('*') / count('date') * 100).alias('rainy_days_percentage')).orderBy(year('date'))

yearly_rain_days.show()