# Apache Spark Dataframe 

We use `walmart_stock.csv` file as our dataset to analyse the data.

### 1- Create an Apache Spark Session

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### 2- Load the `walmart_stock.csv` file into a dataframe and infer the data schema

In [2]:
df_util = spark.read.csv("walmart_stock.csv", header=True, inferSchema=True)

### 3- Display the column names and print the dataframe schema

In [3]:
df_util.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [4]:
df_util.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



### 4- Print out the first five rows of the data

In [5]:
df_util.show(5)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



### 5- Use `describe()` method to get statistical information on the data 

In [6]:
df_util.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

### 6- Use `format_number` function to format the numbers for just showing up to two decimal places. 
[format_number() documentation](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=format_number#pyspark.sql.functions.format_number)

In [7]:
from pyspark.sql.functions import format_number

df_util_formatted = df_util.describe()

In [8]:
df_util_formatted.select(df_util_formatted['summary'],
                 format_number(df_util_formatted['Open'].cast('float'), 2). alias('Open'),
                  format_number(df_util_formatted['High'].cast('float'), 2). alias('High'),
                  format_number(df_util_formatted['Low'].cast('float'), 2). alias('Low'),
                  format_number(df_util_formatted['Close'].cast('float'), 2). alias('Close'),
                  df_util_formatted['Volume'].cast('int'). alias('Volume')).show()

+-------+--------+--------+--------+--------+--------+
|summary|    Open|    High|     Low|   Close|  Volume|
+-------+--------+--------+--------+--------+--------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|    1258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8222093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4519780|
|    min|   56.39|   57.06|   56.30|   56.42| 2094900|
|    max|   90.80|   90.97|   89.25|   90.47|80898100|
+-------+--------+--------+--------+--------+--------+



In [9]:
df_util_formatted.printSchema()

root
 |-- summary: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



### 7- Create a new coulmn called HV Ratio on a new dataframe that returns the ratio of the High Price versus volume of stock traded for a day.

In [10]:
from pyspark.sql.functions import avg
avg_HV_ratio = df_util.agg(avg(df_util['High'] / df_util['Volume']).alias('Avg HV Ratio'))

In [11]:
avg_HV_ratio.show()

+--------------------+
|        Avg HV Ratio|
+--------------------+
|1.058087386431558E-5|
+--------------------+



In [12]:
hv_ratio_df = df_util.withColumn("HV Ratio", df_util['High'] / df_util['Volume'])

In [13]:
hv_ratio_df.show

<bound method DataFrame.show of DataFrame[Date: timestamp, Open: double, High: double, Low: double, Close: double, Volume: int, Adj Close: double, HV Ratio: double]>

### 8- What day had the Peak High in Price?

In [14]:
df_util.groupBy('Date').agg({'High':'max'}).orderBy('Date', ascending=False).show()

+-------------------+---------+
|               Date|max(High)|
+-------------------+---------+
|2016-12-30 00:00:00|    69.43|
|2016-12-29 00:00:00|69.519997|
|2016-12-28 00:00:00|     70.0|
|2016-12-27 00:00:00|    69.82|
|2016-12-23 00:00:00|    69.75|
|2016-12-22 00:00:00|71.239998|
|2016-12-21 00:00:00|     72.0|
|2016-12-20 00:00:00|    71.93|
|2016-12-19 00:00:00|    71.75|
|2016-12-16 00:00:00|71.639999|
|2016-12-15 00:00:00|71.800003|
|2016-12-14 00:00:00|72.480003|
|2016-12-13 00:00:00|72.230003|
|2016-12-12 00:00:00|71.779999|
|2016-12-09 00:00:00|    70.43|
|2016-12-08 00:00:00|70.900002|
|2016-12-07 00:00:00|70.650002|
|2016-12-06 00:00:00|70.389999|
|2016-12-05 00:00:00|70.989998|
|2016-12-02 00:00:00|70.949997|
+-------------------+---------+
only showing top 20 rows



In [15]:
df_util.groupBy('Date').agg({'High':'max'}).orderBy('Date', ascending=False).show()

+-------------------+---------+
|               Date|max(High)|
+-------------------+---------+
|2016-12-30 00:00:00|    69.43|
|2016-12-29 00:00:00|69.519997|
|2016-12-28 00:00:00|     70.0|
|2016-12-27 00:00:00|    69.82|
|2016-12-23 00:00:00|    69.75|
|2016-12-22 00:00:00|71.239998|
|2016-12-21 00:00:00|     72.0|
|2016-12-20 00:00:00|    71.93|
|2016-12-19 00:00:00|    71.75|
|2016-12-16 00:00:00|71.639999|
|2016-12-15 00:00:00|71.800003|
|2016-12-14 00:00:00|72.480003|
|2016-12-13 00:00:00|72.230003|
|2016-12-12 00:00:00|71.779999|
|2016-12-09 00:00:00|    70.43|
|2016-12-08 00:00:00|70.900002|
|2016-12-07 00:00:00|70.650002|
|2016-12-06 00:00:00|70.389999|
|2016-12-05 00:00:00|70.989998|
|2016-12-02 00:00:00|70.949997|
+-------------------+---------+
only showing top 20 rows



In [16]:
df_util.orderBy(df_util['High'].desc()).head()[0]

datetime.datetime(2015, 1, 13, 0, 0)

### 9-What is the mean of the Close column?

In [17]:
df_util.agg({'Close':'mean'}).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



### 10- How many days was the Close lower than 70 USD?

In [18]:
df_util.createOrReplaceTempView("walmart_stock")

In [19]:
spark.sql("SELECT COUNT(*) as num_of_days_less_70\
            FROM walmart_stock WHERE Close <70 LIMIT 10").show()

+-------------------+
|num_of_days_less_70|
+-------------------+
|                397|
+-------------------+



In [20]:
num_days_close_lt_70 = spark.sql("SELECT COUNT(*) FROM walmart_stock WHERE Close < 70").collect()[0][0]
print(f"The number of days where the Close was lower than 70 USD is {num_days_close_lt_70}")

The number of days where the Close was lower than 70 USD is 397


In [21]:
df_util.filter(df_util['Close']<70).count()

397

### 11-What percentage of the time was the High greater than 80 USD ?
#### In other words, (Number of High Days>80)/(Total Days in the dataframe)

In [22]:
spark.sql("SELECT (COUNT(*) / (SELECT COUNT(*) FROM walmart_stock)) * 100 AS high_percentage\
          FROM walmart_stock\
          WHERE High > 80").show()

+-----------------+
|  high_percentage|
+-----------------+
|9.141494435612083|
+-----------------+



In [23]:
from pyspark.sql.functions import count, when

# Count the number of days when the High was greater than 80
high_count = df_util.filter(df_util['High'] > 80).count()

# Calculate the total number of days in the dataframe
total_count = df_util.count()

# Calculate the percentage of the time when the High was greater than 80
percentage_high = (high_count/total_count) * 100

# Print the result
print(f"The percentage of time when the High was greater than 80 USD is: {round(percentage_high, 2)}%")


The percentage of time when the High was greater than 80 USD is: 9.14%


### 12-What is the correlation between High and Volume?

In [24]:
df_util.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [25]:
df_util.withColumn("High",df_util.High.cast('double'))

DataFrame[Date: timestamp, Open: double, High: double, Low: double, Close: double, Volume: int, Adj Close: double]

In [27]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

format_number(df_util['Close'].cast('float'), 2).alias('High')
df_util['Volume'].cast('int').alias('Volume')

df_util.stat.corr('High', 'Volume')

-0.3384326061737161

In [28]:
from pyspark.sql.functions import format_number

df_util_formatted = df_util.withColumn('Open_formatted', format_number(df_util['Open'], 2).cast('double')) \
                           .withColumn('High_formatted', format_number(df_util['High'], 2).cast('double')) \
                           .withColumn('Low_formatted', format_number(df_util['Low'], 2).cast('double')) \
                           .withColumn('Close_formatted', format_number(df_util['Close'], 2).cast('double')) \
                           .withColumn('Volume_formatted', format_number(df_util['Volume'], 0).cast('integer')) \
                           .withColumn('Adj_Close_formatted', format_number(df_util['Adj Close'], 2).cast('double'))

df_util_formatted.select('High_formatted', 'Volume_formatted').show(5)

df_util_formatted.stat.corr('High_formatted', 'Volume_formatted')



+--------------+----------------+
|High_formatted|Volume_formatted|
+--------------+----------------+
|         61.06|            null|
|         60.35|            null|
|         59.62|            null|
|         59.45|            null|
|         59.55|            null|
+--------------+----------------+
only showing top 5 rows



nan

### 13- What is the max High per year (use GroupBy)?

In [29]:
from pyspark.sql.functions import year, month

In [30]:
df_year = df_util.withColumn('Year', year(df_util['Date']))

In [31]:
df_year.groupBy('year').max('High').show()

+----+---------+
|year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



### 14- What is the average Close for each Calendar Month (close price for Jan,Feb, Mar, etc)?


In [32]:
month_df = df_util.withColumn('Month', month(df_util['Date']))
month_avg = month_df.select('Month', "Close").groupBy('Month').mean()

In [33]:
month_avg.select('Month', 'avg(Close)').orderBy('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

