In [0]:
from pyspark.sql.functions import split, col

In [0]:
def read_hdfs(ruta_hdfs):
    csv = spark.read.format('csv')\
             .option("header", "true")\
             .option("inferSchema", "true")\
             .csv(ruta_hdfs)
    return csv

## Leer los datos almacenados

In [0]:
feature = read_hdfs('/FileStore/tables/features.csv')
sales = read_hdfs('/FileStore/tables/sales.csv')
stores = read_hdfs('/FileStore/tables/stores.csv')

### Crear una tabla por cada uno de los ficheros de datos copiados en HDFS

In [0]:
feature.createOrReplaceTempView('feature')
sales.createOrReplaceTempView('sales')
stores.createOrReplaceTempView('stores')

In [0]:
# Cambiar las columnas que deben tener un valor numérico
feature = feature.withColumn("CPI",feature.CPI.cast('double'))
feature = feature.withColumn("Unemployment",feature.Unemployment.cast('double'))
sales = sales.withColumn("Date",sales.Date.cast('date'))

In [0]:
sales

Out[22]: DataFrame[Store: int, Dept: int, Date: date, Weekly_Sales: double, IsHoliday: boolean]

### Mostrar las cinco primeras filas de cada tabla cargada

In [0]:
# Mostrar las primeras 5 filas de las tablas cargadas
for element in ['feature', 'sales', 'stores']:
    query = f'''SELECT * FROM {element} LIMIT 5''' 
    print(query)
    print(element) 
    spark.sql(query).show()

SELECT * FROM feature LIMIT 5
feature
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|Store|      Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|IsHoliday|
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|    1|05/02/2010|      42.31|     2.572|       NA|       NA|       NA|       NA|       NA|211.0963582|       8.106|    false|
|    1|12/02/2010|      38.51|     2.548|       NA|       NA|       NA|       NA|       NA|211.2421698|       8.106|     true|
|    1|19/02/2010|      39.93|     2.514|       NA|       NA|       NA|       NA|       NA|211.2891429|       8.106|    false|
|    1|26/02/2010|      46.63|     2.561|       NA|       NA|       NA|       NA|       NA|211.3196429|       8.106|    false|
|    1|05/03/2010|       46.5|     2.625|       NA|       NA|       NA|  

### Contar el número de filas de cada tabla

In [0]:
for element in ['feature', 'sales', 'stores']:
    query = f'''SELECT COUNT(*) as {element} FROM {element}''' 
    print(query)
    print(f"El número de filas en {element} es : ")
    spark.sql(query).show()


SELECT COUNT(*) as feature FROM feature
El número de filas en feature es : 
+-------+
|feature|
+-------+
|   8190|
+-------+

SELECT COUNT(*) as sales FROM sales
El número de filas en sales es : 
+------+
| sales|
+------+
|421570|
+------+

SELECT COUNT(*) as stores FROM stores
El número de filas en stores es : 
+------+
|stores|
+------+
|    45|
+------+



### El rango (máximo y mínimo) de cada variable numérica

In [0]:
# Obtener el esquema de cada tabla
def get_number_column_name(table):
    schema = globals()[table].schema
    #Obtener el nombre de las columnas que son númericas
    columName = {table : [str(element).split('(')[1].split(',')[0] for element in schema if 'IntegerType' in str(element) or 'DoubleType' in str(element)]}
    return columName

In [0]:
columnFeature = get_number_column_name('feature')
columnSales = get_number_column_name('sales')
columnStores = get_number_column_name('stores')

In [0]:
for table in [columnFeature, columnSales, columnStores]:
    tableName = list(table.keys())[0]
    print(tableName)
    for column in list(table.values())[0]:
        query = f'''SELECT MIN({column}), MAX({column}) FROM {tableName}'''
        print(query)
        spark.sql(query).show()

feature
SELECT MIN(Store), MAX(Store) FROM feature
+----------+----------+
|min(Store)|max(Store)|
+----------+----------+
|         1|        45|
+----------+----------+

SELECT MIN(Temperature), MAX(Temperature) FROM feature
+----------------+----------------+
|min(Temperature)|max(Temperature)|
+----------------+----------------+
|           -7.29|          101.95|
+----------------+----------------+

SELECT MIN(Fuel_Price), MAX(Fuel_Price) FROM feature
+---------------+---------------+
|min(Fuel_Price)|max(Fuel_Price)|
+---------------+---------------+
|          2.472|          4.468|
+---------------+---------------+

SELECT MIN(CPI), MAX(CPI) FROM feature
+--------+--------+
|min(CPI)|max(CPI)|
+--------+--------+
| 126.064|      NA|
+--------+--------+

SELECT MIN(Unemployment), MAX(Unemployment) FROM feature
+-----------------+-----------------+
|min(Unemployment)|max(Unemployment)|
+-----------------+-----------------+
|           10.064|               NA|
+-----------------+

### Estudiar las diferentes categorías de las principales variables categóricas y el número de filas correspondientes a cada categoría.

Las variables categoricas según (suport, minilab, sf) son valores de una variable categórica son categorías o grupos mutuamente excluyentes.
Partiendo de esa premisa se extrae las variables categoricas de las tablas dejando la siguiente estructura:
  - **sales:** IsHoliday
  - **feature:** IsHoliday
  - **stores:** Type

In [0]:
categories = {'sales': ['IsHoliday'], 'feature': ['IsHoliday'], 'stores':['Type']}

In [0]:
for element in categories:
    print(element)
    for category in categories[element]:
        query = f'''SELECT {category}, COUNT(*) as CountValues  FROM {element} GROUP BY {category}''' 
        print(f"El número de filas en {element} es : ")
        print(query)
        spark.sql(query).show()


sales
El número de filas en sales es : 
SELECT IsHoliday, COUNT(*) as CountValues  FROM sales GROUP BY IsHoliday
+---------+-----------+
|IsHoliday|CountValues|
+---------+-----------+
|     true|      29661|
|    false|     391909|
+---------+-----------+

feature
El número de filas en feature es : 
SELECT IsHoliday, COUNT(*) as CountValues  FROM feature GROUP BY IsHoliday
+---------+-----------+
|IsHoliday|CountValues|
+---------+-----------+
|     true|        585|
|    false|       7605|
+---------+-----------+

stores
El número de filas en stores es : 
SELECT Type, COUNT(*) as CountValues  FROM stores GROUP BY Type
+----+-----------+
|Type|CountValues|
+----+-----------+
|   B|         17|
|   C|          6|
|   A|         22|
+----+-----------+



### Buscar valores inexistentes o anómalos

#### Inexistentes

In [0]:
for element in ['feature', 'sales', 'stores']:
    print(element)
    columns = [element[0] for element in spark.sql(f'''SHOW COLUMNS FROM {element}''').collect()]
    for columName in columns:
        print(columName)
        query = f''' SELECT COUNT({columName}) as NotNull, SUM(CASE WHEN {columName} IS NULL then 1 else 0 end) as NullCount, SUM(CASE WHEN {columName} = 'NA' then 1 else 0 end) as NACount FROM {element};''' 
        print(query)
        spark.sql(query).show()

feature
Store
 SELECT COUNT(Store) as NotNull, SUM(CASE WHEN Store IS NULL then 1 else 0 end) as NullCount, SUM(CASE WHEN Store = 'NA' then 1 else 0 end) as NACount FROM feature;
+-------+---------+-------+
|NotNull|NullCount|NACount|
+-------+---------+-------+
|   8190|        0|      0|
+-------+---------+-------+

Date
 SELECT COUNT(Date) as NotNull, SUM(CASE WHEN Date IS NULL then 1 else 0 end) as NullCount, SUM(CASE WHEN Date = 'NA' then 1 else 0 end) as NACount FROM feature;
+-------+---------+-------+
|NotNull|NullCount|NACount|
+-------+---------+-------+
|   8190|        0|      0|
+-------+---------+-------+

Temperature
 SELECT COUNT(Temperature) as NotNull, SUM(CASE WHEN Temperature IS NULL then 1 else 0 end) as NullCount, SUM(CASE WHEN Temperature = 'NA' then 1 else 0 end) as NACount FROM feature;
+-------+---------+-------+
|NotNull|NullCount|NACount|
+-------+---------+-------+
|   8190|        0|      0|
+-------+---------+-------+

Fuel_Price
 SELECT COUNT(Fuel_Price)

#### Anomalos
En base al documento anexos los valores anomalos son: 

**Feature**: Temperature (Al no especificar la procedencia de los datos, en este caso se van a tomar como datos anomalos una temperatura menor a **0 grados** y no superior a **30 grados** )

In [0]:
query = f'''SELECT COUNT(*) FROM feature WHERE Temperature < 0 OR Temperature > 30;''' 
spark.sql(query).show()

+--------+
|count(1)|
+--------+
|    7613|
+--------+



**sales**: Weekly_Sales (No es posible que exista ventas en negativo)

In [0]:
query = f'''SELECT COUNT(*) FROM sales WHERE Weekly_Sales < 0;''' 
spark.sql(query).show()

+--------+
|count(1)|
+--------+
|    1285|
+--------+



### Realizar alguna operación join

In [0]:
#Feature sales
query = f'''SELECT stores.Type, feature.IsHoliday, sales.Date, sales.Dept, sales.Weekly_Sales FROM stores INNER JOIN feature ON stores.Store = feature.Store INNER JOIN sales ON feature.Store = sales.Store WHERE Weekly_Sales > 0 ''' 
#Join y creación de vista
data_report = spark.sql(query)
data_report = data_report.withColumn("Month", split(col("Date"), "/").getItem(1)).withColumn("Year", split(col("Date"), "/").getItem(2))
data_report.createOrReplaceTempView('data_report')

In [0]:
#Probar vista creada
spark.sql(f'''SELECT * FROM data_report''').show()

+----+---------+----------+----+------------+-----+----+
|Type|IsHoliday|      Date|Dept|Weekly_Sales|Month|Year|
+----+---------+----------+----+------------+-----+----+
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1|     24924.5|   02|2010|
|   A|    false|05/02/2010|   1

## Ejercicio 

A la compañía en cuestión le gustaría, tras un primer análisis exploratorio sencillo, deducir alguna información interesante y que les pueda dar pistas sobre qué tal funcionan las ventas en cada tienda y en los departamentos de estas, cómo evolucionan las ventas a lo largo del año.

In [0]:
# Librerias para reporte 
import pandas as pd
import plotly.express as px


### Ventas en cada tipo de tienda por años

In [0]:
query = f'''SELECT Year, Type, SUM(Weekly_Sales) AS Tota_Sale_Year, MAX(Weekly_Sales) AS Higher_sales, MIN(Weekly_Sales) AS Lower_Sales, AVG(Weekly_Sales) AS Average_Sales
 FROM data_report GROUP BY Type, Year ORDER BY Year, Type  ASC;'''
ventas_anio = spark.sql(query)
ventas_anio.show()
df_ventas_anio = ventas_anio.toPandas()

+----+----+--------------------+------------+-----------+------------------+
|Year|Type|      Tota_Sale_Year|Higher_sales|Lower_Sales|     Average_Sales|
+----+----+--------------------+------------+-----------+------------------+
|2010|   A|2.667120366929903E11|    474330.1|       0.01|20375.839477487607|
|2010|   B|1.255779099866005...|   693099.36|       0.02|12675.224417846517|
|2010|   C|2.429329003009659...|   100712.42|       0.01| 9598.016506074731|
|2011|   A|2.872364877786299E11|   392023.02|       0.01|20159.959894259977|
|2011|   B|1.317927103730288...|   649770.18|       0.02|12233.883044388138|
|2011|   C|2.654913939313075...|   112152.35|       0.01| 9433.160389934636|
|2012|   A|2.343039366213181E11|   224917.94|       0.01| 19880.84730368465|
|2012|   B|1.067643596382634E11|   233140.32|       0.01|11932.818564886064|
|2012|   C|2.296003054435597E10|    110668.4|       0.02| 9635.225997095993|
+----+----+--------------------+------------+-----------+------------------+

In [0]:
df_ventas_line = df_ventas_anio[['Year', 'Type', 'Tota_Sale_Year']]
fig = px.line(df_ventas_line, x="Year", y="Tota_Sale_Year", title='Ventas por tipo de tienda cada año' , color='Type')
fig.show()

In [0]:
fig = px.bar(df_ventas_line.groupby(['Type']).sum().reset_index(), x='Type', y='Tota_Sale_Year', title='Ventas por tipo de tienda')
fig.show()

In [0]:
fig = px.bar(df_ventas_line.groupby(['Year']).sum().reset_index(), x='Year', y='Tota_Sale_Year', title='Ventas por año')
fig.show()

### Ventas por mes

In [0]:
query = f'''SELECT Year, Month, SUM(Weekly_Sales) AS Tota_Sale_Year, MAX(Weekly_Sales) AS Higher_sales, MIN(Weekly_Sales) AS Lower_Sales, AVG(Weekly_Sales) AS Average_Sales
 FROM data_report GROUP BY Month, Year ORDER BY Year, Month  ASC;'''
ventas_month = spark.sql(query)
ventas_month.show()
df_ventas_month = ventas_month.toPandas()

+----+-----+--------------------+------------+-----------+------------------+
|Year|Month|      Tota_Sale_Year|Higher_sales|Lower_Sales|     Average_Sales|
+----+-----+--------------------+------------+-----------+------------------+
|2010|   02|3.464115546890837E10|   293966.05|       0.01|16123.339183391454|
|2010|   03|3.311008999479466E10|   214383.07|       0.01|15480.222208983207|
|2010|   04|4.211734753224934...|   203457.42|       0.01|15786.479120672724|
|2010|   05|3.398151030274569E10|   206160.36|       0.01|16063.976173968103|
|2010|   06|3.498893806407283E10|   194723.71|       0.03|16551.606753332813|
|2010|   07|4.233004345205396E10|   198349.17|       0.01|16009.268745179435|
|2010|   08|3.415076355053869...|   204695.13|        0.1|16205.333618619297|
|2010|   09|3.226344867199576...|   205314.67|       0.28| 15172.17528243553|
|2010|   10|3.952486322513209...|   210596.66|       0.01|14839.055562696718|
|2010|   11|3.691964184272421E10|   693099.36|       0.25|  1736

In [0]:
df_ventas_month = df_ventas_month[['Year', 'Month', 'Tota_Sale_Year']]
fig = px.line(df_ventas_month.sort_values(by=['Month']), x="Month", y="Tota_Sale_Year", title='Ventas por mes cada año' , color='Year')
fig.show()

In [0]:
fig = px.bar(df_ventas_month.groupby(['Month']).sum().reset_index(), x='Month', y='Tota_Sale_Year', title='Ventas por mes')
fig.show()

### Ventas por tipo, mes y año

In [0]:
query = f'''SELECT Year, Type, Month, SUM(Weekly_Sales) AS Tota_Sale_Year, MAX(Weekly_Sales) AS Higher_sales, MIN(Weekly_Sales) AS Lower_Sales, AVG(Weekly_Sales) AS Average_Sales
 FROM data_report GROUP BY Month, Year, Type ORDER BY Year, Month, Type  ASC;'''
ventas_month_year = spark.sql(query)
ventas_month_year.show()
df_ventas_month_year = ventas_month_year.toPandas()

+----+----+-----+--------------------+------------+-----------+------------------+
|Year|Type|Month|      Tota_Sale_Year|Higher_sales|Lower_Sales|     Average_Sales|
+----+----+-----+--------------------+------------+-----------+------------------+
|2010|   A|   02|2.222890566623732E10|   293966.05|       0.01| 20238.08522286417|
|2010|   B|   02|1.037632355760100...|   232558.51|       0.94| 12369.87780429714|
|2010|   C|   02|2.0359262450798702E9|    95351.96|       0.02|  9635.14895779439|
|2010|   A|   03|2.111577450164284...|   214383.07|       0.01|19301.403929465254|
|2010|   B|   03| 9.981511479939302E9|   191989.54|       0.94| 12006.01350043699|
|2010|   C|   03|2.0128040132198968E9|    95079.18|       0.01| 9428.271705029354|
|2010|   A|   04|2.693571588525848...|   203457.42|       0.01|19738.388761001825|
|2010|   B|   04|1.264861466778000...|   145589.34|        0.1|12188.334407225537|
|2010|   C|   04| 2.533016979219781E9|    92534.76|        0.1| 9539.188286496776|
|201

In [0]:
fig = px.line(df_ventas_month_year.sort_values(by=['Month']), x="Month", y="Tota_Sale_Year", title='Ventas por mes cada mes por tipo de tienda' , color='Type')
fig.show()