## Projeto PySpark:
### Passos -
- Use the walmart_stock.csv file to complete the tasks below;
- Start a simple Spark Session;
- Load the Walmart Stock CSV file, have Spark infer the data types;
- Verify what kind of columns the file have;
- Verify the name of the columns;
- Print out the first 5 columns;
- Use describe() to learn about the DataFrame;
- Create a new dataframe with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day.

### _Bonus Question!_

There are too many decimal places for mean and stddev in the describe() dataframe. 
Format the numbers to just show up to two decimal places. Pay careful attention to the datatypes that .describe() returns,
we didn't cover how to do this exact formating, but we covered something very similar. 


### Questions:
1. What day had the Peak High in price ?
2. What is the mean of the Close column ?
3. What is the max and min of the Volume column ?
4. How many days was the Close lower than 60 dollars ?
5. What percentage of the time was the High greater than 80 dollars ? (Number of Days High>80/Total Days in the dataset)
6. What is the Pearson correlation between High and Volume ? (corr)
7. What is the max High per year ?
What is the average Close for each Calendar Month ?

In [1]:
import findspark
findspark.init()

In [156]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import corr

In [3]:
spark = SparkSession\
    .builder\
    .appName('exercise')\
    .getOrCreate()

In [4]:
spark

In [5]:
df = spark\
    .read\
    .option('header', 'true')\
    .csv('C:/Users/drumo/OneDrive/Documentos/Estudo/Programação/VSCode/Python/Datasets/walmart_stock.csv')

In [6]:
df.show(10)

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|
|2012-01-10|             59.43|59.709998999999996|             5

In [7]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [10]:
df.select('Date').tail(1)

[Row(Date='2016-12-30')]

In [11]:
df.head(5)

[Row(Date='2012-01-03', Open='59.970001', High='61.060001', Low='59.869999', Close='60.330002', Volume='12668800', Adj Close='52.619234999999996'),
 Row(Date='2012-01-04', Open='60.209998999999996', High='60.349998', Low='59.470001', Close='59.709998999999996', Volume='9593300', Adj Close='52.078475'),
 Row(Date='2012-01-05', Open='59.349998', High='59.619999', Low='58.369999', Close='59.419998', Volume='12768200', Adj Close='51.825539'),
 Row(Date='2012-01-06', Open='59.419998', High='59.450001', Low='58.869999', Close='59.0', Volume='8069400', Adj Close='51.45922'),
 Row(Date='2012-01-09', Open='59.029999', High='59.549999', Low='58.919998', Close='59.18', Volume='6679300', Adj Close='51.616215000000004')]

In [12]:
new_df = df.withColumn('HV Ratio', F.col('High')/F.col('volume'))

In [13]:
new_df.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|            HV Ratio|
+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|7.367338463826307E-6|
|2012-01-09|         59.029999|   

In [24]:
print('Colunas Atuais:',df.columns)

Colunas Atuais: ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


In [30]:
df = df.withColumnRenamed('Date','Data')\
    .withColumnRenamed('Open', 'Valor_Abertura')\
    .withColumnRenamed('High', 'Alta')\
    .withColumnRenamed('Low', 'Baixa')\
    .withColumnRenamed('Close', 'Valor_Fechamento')

In [33]:
df.na.fill('Inavalid Value')

DataFrame[Data: string, Valor_Abertura: string, Alta: string, Baixa: string, Valor_Fechamento: string, Volume: string, Adj Close: string]

In [None]:
novos_nomes = ['Data', 'Valor_Abertura', 'Alta', 'Baixa', 'Valor_Fechamento', 'Volume', 'Adj Close', 'HV Ratio']

In [None]:
new_df = new_df.toDF(*novos_nomes)

In [147]:
new_df.printSchema()

root
 |-- Data: string (nullable = true)
 |-- Valor_Abertura: string (nullable = true)
 |-- Alta: string (nullable = true)
 |-- Baixa: string (nullable = true)
 |-- Valor_Fechamento: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- HV Ratio: double (nullable = true)
 |-- Ano: integer (nullable = true)
 |-- Meses: integer (nullable = true)



In [93]:
# Desvio padrão da coluna dos valores de abertura

desvio = new_df.select(F.stddev('Valor_Abertura').alias('desvio'))
desvio.select(F.format_number('desvio', 4).alias(' Desvio Padrão(Abertura): ')).show()

+--------------------------+
| Desvio Padrão(Abertura): |
+--------------------------+
|                    6.7681|
+--------------------------+



In [101]:
# Média dos valores de fechamento

media = new_df.select(F.mean('Valor_Fechamento').alias('fechamento'))
media.select(F.format_number('fechamento', 1).alias('Média do Valor de Fechamento')).show()

+----------------------------+
|Média do Valor de Fechamento|
+----------------------------+
|                        72.4|
+----------------------------+



In [107]:
# Máximo e Mínimo dentre os valores da coluna de volume de venda

new_df.select(F.min('Volume').alias(' Valor Mínimo '), F.max('Volume').alias(' Valor Máximo ')).show()

+--------------+--------------+
| Valor Mínimo | Valor Máximo |
+--------------+--------------+
|      10010500|       9994400|
+--------------+--------------+



In [108]:
# Quantidade de dias em que o valor de fechamento foi menor que U$60,00

new_df.select('Valor_Fechamento')\
    .where(F.col('Valor_Fechamento') < 60)\
    .count()

81

In [121]:
valores_positivos = new_df.select('Data')\
    .where(F.col('Alta') > 80)\
    .count()

valores_totais = new_df.select('Alta').count()

print(f'Porcentagem de Altas maiores que U$80,00: {(valores_positivos/valores_totais)*100:.2f}%')

Porcentagem de Altas maiores que U$80,00: 8.43%


In [129]:
new_df = new_df.withColumn('Ano', F.year(new_df['Data']))
new_df = new_df.withColumn('Meses', F.month(new_df['Data']))

In [159]:
# Mede o grau de correlação entre duas variáveis de escala métrica:

new_df.select(corr('Alta', 'Volume').alias('Correlação entre Altas e Volume')).show()

+-------------------------------+
|Correlação entre Altas e Volume|
+-------------------------------+
|            -0.3384326061737161|
+-------------------------------+

