In [111]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf
import pandas as pd
import numpy as np 

spark = SparkSession.builder.master('local').getOrCreate()

cdf = spark.read.csv("data/coffee.csv", header=True, 
schema="""Date string, Open float, High float, Low float, Close float, Volume float, Currency string""")

In [112]:
# lower case columns 
cdf = cdf.toDF(*[c.lower() for c in cdf.columns])

# Add a column to the DataFrame where the values are the difference between 'Open' and 'Close'
# Add a column to the DataFrame where the values are the difference between 'High' and 'Low'
# Add a column to the DataFrame where the values are 'True' if the volume for that day was 100 or above, and otherwise 'False'.

cdf = cdf \
  .withColumn("oc_diff", sf.round(cdf.open - cdf.close, 4)) \
  .withColumn("hl_diff", sf.round(cdf.high - cdf.low, 4)) \
  .withColumn("over_100", cdf.volume >= 100)

# Add another column that contains the absolute values of the numbers in oc_diff
# Compute a column called net_sales which is the average of opening, high, low, and closing cost times the volume

abs_map = sf.udf(lambda x: abs(x))
cdf = cdf \
  .withColumn("abs_oc_diff", abs_map(cdf.oc_diff)) \
  .withColumn("net_sales", sf.round((cdf.open + cdf.close + cdf.high + cdf.low)/4, 4))



In [113]:
# Find the average of the values in the column that has the absolute values of the difference between 'Open' and 'Close'.
# Find the average 'Open' value.
# Get the highest 'High' value.

cdf.createOrReplaceTempView("coffee")
result = spark.sql("""
select ROUND(AVG(open), 4) as avg_open,
       ROUND(AVG(abs_oc_diff), 4) as avg_abs_oc_diff,
       MAX(high) as max_high 
from coffee
""")
result.show()

# Get the count of values where the 'Volume' was less than 100
cdf.filter(cdf.over_100 == False).count()

+--------+---------------+--------+
|avg_open|avg_abs_oc_diff|max_high|
+--------+---------------+--------+
|126.0497|         1.7606|  306.25|
+--------+---------------+--------+



1638

In [116]:
# Write to parquet 

cdf.write.parquet("data/coffee.parquet")

                                                                                