In [149]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import DateType
from pyspark.sql.types import FloatType
from pyspark.sql.types import StringType
import pyspark.sql.functions as F
from pyspark.sql.functions import round, col, avg, max

In [137]:
spark = SparkSession.builder.master('local').getOrCreate()

In [138]:
#Read the coffee data CSV file into a Spark DataFrame.
# All the columns should be floats except for the 'Date' and 'Currency' columns.
# Columns from Aggregate Functions

data_file = './data/coffee.csv'

csvSchema = StructType([
    StructField('Date',DateType(),True),
    StructField('Open',FloatType(),True),
    StructField('High',FloatType(),True),
    StructField('Low',FloatType(),True),
    StructField('Close',FloatType(),True),
    StructField('Volume',FloatType(),True),
    StructField('Currency',StringType(),True),
    ])

df=spark.read.format("csv").option('header','true').schema(csvSchema).load(data_file)

df.show(4)

+----------+------+-----+------+------+------+--------+
|      Date|  Open| High|   Low| Close|Volume|Currency|
+----------+------+-----+------+------+------+--------+
|2000-01-03|122.25|124.0| 116.1| 116.5|6640.0|     USD|
|2000-01-04|116.25|120.5|115.75|116.25|5492.0|     USD|
|2000-01-05| 115.0|121.0| 115.0| 118.6|6165.0|     USD|
|2000-01-06| 119.0|121.4| 116.5|116.85|5094.0|     USD|
+----------+------+-----+------+------+------+--------+
only showing top 4 rows



In [139]:
# Add a column to the DataFrame where the values are the difference between 'Open' and 'Close'.
df = df.withColumn('open_close', round((df['Open'] - df['Close']), 2))

# Add a column to the DataFrame where the values are the difference between 'High' and 'Low'.
df = df.withColumn('high_low', round(( df['High'] - df['Low'] ), 2))

df.show(4)

+----------+------+-----+------+------+------+--------+----------+--------+
|      Date|  Open| High|   Low| Close|Volume|Currency|open_close|high_low|
+----------+------+-----+------+------+------+--------+----------+--------+
|2000-01-03|122.25|124.0| 116.1| 116.5|6640.0|     USD|      5.75|     7.9|
|2000-01-04|116.25|120.5|115.75|116.25|5492.0|     USD|       0.0|    4.75|
|2000-01-05| 115.0|121.0| 115.0| 118.6|6165.0|     USD|      -3.6|     6.0|
|2000-01-06| 119.0|121.4| 116.5|116.85|5094.0|     USD|      2.15|     4.9|
+----------+------+-----+------+------+------+--------+----------+--------+
only showing top 4 rows



In [140]:
# Add a column to the DataFrame where the values are 'True' if the volume for that day was 100 or above, and otherwise 'False'.
df = df.withColumn('high_volume',
    F.when((col("Volume") > 100), True)\
    .when((col("Volume") < 100), False)
)

# Once you have a column for the difference between 'Open' and 'Close', add another column that contains the absolute values of the numbers in that column.
df = df.withColumn('open_close_abs',abs(df.open_close))

df.show(5)

+----------+------+------+------+------+------+--------+----------+--------+-----------+--------------+
|      Date|  Open|  High|   Low| Close|Volume|Currency|open_close|high_low|high_volume|open_close_abs|
+----------+------+------+------+------+------+--------+----------+--------+-----------+--------------+
|2000-01-03|122.25| 124.0| 116.1| 116.5|6640.0|     USD|      5.75|     7.9|       true|          5.75|
|2000-01-04|116.25| 120.5|115.75|116.25|5492.0|     USD|       0.0|    4.75|       true|           0.0|
|2000-01-05| 115.0| 121.0| 115.0| 118.6|6165.0|     USD|      -3.6|     6.0|       true|           3.6|
|2000-01-06| 119.0| 121.4| 116.5|116.85|5094.0|     USD|      2.15|     4.9|       true|          2.15|
|2000-01-07|117.25|117.75| 113.8|114.15|6855.0|     USD|       3.1|    3.95|       true|           3.1|
+----------+------+------+------+------+------+--------+----------+--------+-----------+--------------+
only showing top 5 rows



In [141]:
# Compute a column called net_sales which is the average of opening, high, low, and closing cost times the volume: net_sales = avg(opening, high, low, closing price) * volume

df = df.withColumn('net_sales', round(((df['open']+df['close']+df['high']+df['low']) / 4) * df['Volume'], 2))
df.show(5)

+----------+------+------+------+------+------+--------+----------+--------+-----------+--------------+---------+
|      Date|  Open|  High|   Low| Close|Volume|Currency|open_close|high_low|high_volume|open_close_abs|net_sales|
+----------+------+------+------+------+------+--------+----------+--------+-----------+--------------+---------+
|2000-01-03|122.25| 124.0| 116.1| 116.5|6640.0|     USD|      5.75|     7.9|       true|          5.75|794891.01|
|2000-01-04|116.25| 120.5|115.75|116.25|5492.0|     USD|       0.0|    4.75|       true|           0.0|643593.75|
|2000-01-05| 115.0| 121.0| 115.0| 118.6|6165.0|     USD|      -3.6|     6.0|       true|           3.6|723771.01|
|2000-01-06| 119.0| 121.4| 116.5|116.85|5094.0|     USD|      2.15|     4.9|       true|          2.15|603320.63|
|2000-01-07|117.25|117.75| 113.8|114.15|6855.0|     USD|       3.1|    3.95|       true|           3.1|793380.58|
+----------+------+------+------+------+------+--------+----------+--------+-----------+

In [150]:
# Find the average of the values in the column that has the absolute values of the difference between 'Open' and 'Close'.
df.agg(avg(col("open_close_abs"))).show()

# Get the count of values where the 'Volume' was less than 100.
df.filter(df['high_volume'] == 'false').agg({'high_volume':'count'}).show()

# Find the average 'Open' value.
df.agg(avg(col("Open"))).show()

# Get the highest 'High' value.
df.select(max(df.High)).show()


+-------------------+
|avg(open_close_abs)|
+-------------------+
| 1.7606027500686203|
+-------------------+

+------------------+
|count(high_volume)|
+------------------+
|              1638|
+------------------+

+-----------------+
|        avg(Open)|
+-----------------+
|126.0496775257701|
+-----------------+

+---------+
|max(High)|
+---------+
|   306.25|
+---------+



In [151]:
#Save your DataFrame (including the four added columns) to /data as a parquet file.
df.write.parquet('./data/coffee.parquet')

                                                                                