<a href="https://colab.research.google.com/github/iara/jump_workshop_pyspark/blob/main/Case2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [18]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [19]:
import findspark

findspark.init()

In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,sum,avg,max, udf


spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Iniciando com Spark') \
    .config('spark.ui.port', '4050') \
    .getOrCreate()


Load the Walmart Stock CSV File, have Spark infer the data types.

In [26]:
!wget --quiet --show-progress https://raw.githubusercontent.com/pratikbarjatya/spark-walmart-data-analysis-exercise/master/walmart_stock.csv

df = spark.read.csv("./walmart_stock.csv", inferSchema=True, header=True, sep=",")



What are the column names?

In [31]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

What does the Schema look like?

In [32]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



Print out the first 5 columns.


In [34]:
for line in df.head(5):
 print(line, '\n')

Row(Date='2012-01-03', Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996) 

Row(Date='2012-01-04', Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475) 

Row(Date='2012-01-05', Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539) 

Row(Date='2012-01-06', Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922) 

Row(Date='2012-01-09', Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004) 



Que dia teve o pico de alta no preço?

In [38]:
df.orderBy(df['High'].desc()).select(['Date']).head(1)[0]['Date']

'2015-01-13'

Qual é a média da coluna Fechar?

In [39]:
from pyspark.sql.functions import mean
df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



Qual é o máximo e o mínimo da coluna Volume?

In [40]:
from pyspark.sql.functions import min, max
df.select(max('Volume'),min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+




Em quantos dias o Fechamento foi inferior a 60 dólares?

In [42]:
df.filter(df['Close'] < 60).count()

81

Em que porcentagem do tempo a alta foi superior a 80 dólares?
Em outras palavras, (Número de dias de alta>80)/(Total de dias no conjunto de dados)

In [43]:
df.filter('High > 80').count() * 100/df.count()

9.141494435612083

Qual é a máxima máxima por ano?

In [45]:
from pyspark.sql.functions import (dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, date_format)
year_df = df.withColumn('Year', year(df['Date']))
year_df.groupBy('Year').max()['Year', 'max(High)'].show()


+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



Qual é o fechamento médio para cada mês do calendário?
Em outras palavras, em todos os anos, qual é o preço médio de fechamento para janeiro, fevereiro, março etc.
resultado terá um valor para cada um desses meses.

In [46]:
#Create a new column Month from existing Date column
month_df = df.withColumn('Month', month(df['Date']))
#Group by month and take average of all other columns
month_df = month_df.groupBy('Month').mean()
#Sort by month
month_df = month_df.orderBy('Month')
#Display only month and avg(Close), the desired columns
month_df['Month', 'avg(Close)'].show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+



In [47]:
df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [51]:
#Create a new column Month from existing Date column
month_df = df.withColumn('Month', month(df['Date']))
#Group by month and take average of all other columns
month_df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+-----+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|Month|
+----------+------------------+---------+---------+------------------+--------+------------------+-----+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|    1|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|    1|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|    1|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|    1|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|    1|
+----------+------------------+---------+---------+------------------+--------+------------------+-----+
only showing top 5 rows



In [54]:
@udf
def to_month_str(number_month):
  month_lst = ['January', 'Feburary', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']
  if number_month is not None:
    return month_lst[number_month-1]

In [56]:
month_df.select('*', to_month_str("Month").alias("Name_Month")).show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+-----+----------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|Month|Name_Month|
+----------+------------------+------------------+------------------+------------------+--------+------------------+-----+----------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|    1|   January|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|    1|   January|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|    1|   January|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|    1|   January|
|2012-01-09|         59.029999|         59.549999|         58.