###Actividad Grupal: Análisis exploratorio con Apache Hive con HDFS


Importar bibliotecas

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, DoubleType
import pyspark.sql.functions as F

Crear la sesión de Spark

In [2]:
spark = SparkSession.builder\
.appName("procma23")\
.getOrCreate()

In [166]:
spark

##Leemos los archivos de fuente de datos, con encabezado e infiriendo el esquema.

In [4]:
ruta_hdfs = "/procma23dir/features.csv"

# Leer el archivo de features
features = spark.read\
             .option("header", True)\
             .option("inferSchema", "true")\
             .csv(ruta_hdfs)

In [5]:
ruta_hdfs = "/procma23dir/stores.csv"

# Leer el archivo de features
stores = spark.read\
             .option("header", True)\
             .option("inferSchema", "true")\
             .csv(ruta_hdfs)

In [6]:
ruta_hdfs = "/procma23dir/sales.csv"

# Leer el archivo de features
sales = spark.read\
             .option("header", True)\
             .option("inferSchema", "true")\
             .csv(ruta_hdfs)

##Mostrar el contenido de las tablas

In [167]:
features.show(5)

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|Store|      Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|IsHoliday|
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|    1|2010-02-05|      42.31|     2.572|      0.0|      0.0|      0.0|      0.0|      0.0|211.0963582|       8.106|    false|
|    1|2010-02-12|      38.51|     2.548|      0.0|      0.0|      0.0|      0.0|      0.0|211.2421698|       8.106|     true|
|    1|2010-02-19|      39.93|     2.514|      0.0|      0.0|      0.0|      0.0|      0.0|211.2891429|       8.106|    false|
|    1|2010-02-26|      46.63|     2.561|      0.0|      0.0|      0.0|      0.0|      0.0|211.3196429|       8.106|    false|
|    1|2010-03-05|       46.5|     2.625|      0.0|      0.0|      0.0|      0.0|      0.0|211.3501429|       8

In [8]:
stores.show(5)

+-----+----+------+
|Store|Type|  Size|
+-----+----+------+
|    1|   A|151315|
|    2|   A|202307|
|    3|   B| 37392|
|    4|   A|205863|
|    5|   B| 34875|
+-----+----+------+
only showing top 5 rows



In [9]:
sales.show(5)

+-----+----+----------+------------+---------+
|Store|Dept|      Date|Weekly_Sales|IsHoliday|
+-----+----+----------+------------+---------+
|    1|   1|05/02/2010|     24924.5|    false|
|    1|   1|12/02/2010|    46039.49|     true|
|    1|   1|19/02/2010|    41595.55|    false|
|    1|   1|26/02/2010|    19403.54|    false|
|    1|   1|05/03/2010|     21827.9|    false|
+-----+----+----------+------------+---------+
only showing top 5 rows



##Revisar el esquema de la tabla

In [10]:
features.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: string (nullable = true)
 |-- MarkDown2: string (nullable = true)
 |-- MarkDown3: string (nullable = true)
 |-- MarkDown4: string (nullable = true)
 |-- MarkDown5: string (nullable = true)
 |-- CPI: string (nullable = true)
 |-- Unemployment: string (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



Convertir a fecha el campo Date

In [11]:
features = features.withColumn("Date",\
            F.when(F.to_date(features.Date, "dd/MM/yyyy").isNotNull(),\
            F.to_date(features.Date, "dd/MM/yyyy")).otherwise(None))

Convertir NA a 0 para poder convertir las columnas a un tipo numérico

In [12]:
columns_for_replacement = ['MarkDown1', 'MarkDown2', 'MarkDown3',\
                           'MarkDown4', 'MarkDown5', 'CPI', 'Unemployment']
# replace NA for 0 to convert to int
for i in columns_for_replacement:
    features = features.withColumn(i, F.when((F.col(i)=='NA'),0)\
                                   .otherwise(F.col(i)).cast('double'))

Revisamos nuevamente el esquema:

In [13]:
features.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: double (nullable = true)
 |-- MarkDown2: double (nullable = true)
 |-- MarkDown3: double (nullable = true)
 |-- MarkDown4: double (nullable = true)
 |-- MarkDown5: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



Comprobamos la existencia de valores nulos:

In [14]:
features.select([F.count(F.when(F.col(c).isNull(), c)).alias(c)\
                 for c in features.columns])\
        .show()

+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+
|Store|Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|CPI|Unemployment|IsHoliday|
+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+
|    0|   0|          0|         0|        0|        0|        0|        0|        0|  0|           0|        0|
+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+



El dataset de "sales" solo requiere el cambio de la columna "Date"

In [15]:
sales = sales.withColumn("Date", F.when(F.to_date(sales.Date, "dd/MM/yyyy").isNotNull(),\
                                      F.to_date(sales.Date, "dd/MM/yyyy")).otherwise(None))

In [16]:
sales.filter("Date is null").count()

0

Contamos el número de registros en cada dataframe:

In [17]:
print(f"Registros en 'Features' : {features.count()}")
print(f"Registros en 'Stores' : {stores.count()}")
print(f"Registros en 'Sales' : {sales.count()}")

Registros en 'Features' : 8190
Registros en 'Stores' : 45
Registros en 'Sales' : 421570


Conteo usando SQL:

In [18]:
features.createOrReplaceTempView("features")
spark.sql("SELECT COUNT(*) as Count FROM features").show()

+-----+
|Count|
+-----+
| 8190|
+-----+



Obtener el máximo y el mínimo de cada columna:

In [19]:
dbl_cols = [t.name for t in features.schema.fields if isinstance(t.dataType, DoubleType)]


Máximos:

In [20]:
features.select([F.max(F.col(c)).alias(c) for c in dbl_cols]).show()

+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+
|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|
+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+
|     101.95|     4.468|103184.98|104519.54|149483.31| 67474.85| 771448.1|228.9764563|      14.313|
+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+



Mínimos:

In [21]:
features.select([F.min(F.col(c)).alias(c) for c in dbl_cols]).show()

+-----------+----------+---------+---------+---------+---------+---------+---+------------+
|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|CPI|Unemployment|
+-----------+----------+---------+---------+---------+---------+---------+---+------------+
|      -7.29|     2.472| -2781.45|  -265.76|  -179.26|      0.0|  -185.17|0.0|         0.0|
+-----------+----------+---------+---------+---------+---------+---------+---+------------+



In [22]:
features.createOrReplaceTempView("features")
spark.sql("SELECT Max(Temperature) as MaxTemp, Min(Temperature) as MinTemp, "\
          "Max(Fuel_Price) as MaxFuel, Min(Fuel_Price) as MinFuel, "\
          "Max(MarkDown1) as MaxMarkDown1, Min(MarkDown1) as MinMarkDown1, "\
          "Max(MarkDown2) as MaxMarkDown2, Min(MarkDown2) as MinMarkDown2, "\
          "Max(MarkDown3) as MaxMarkDown3, Min(MarkDown3) as MinMarkDown3, "\
          "Max(MarkDown4) as MaxMarkDown4, Min(MarkDown4) as MinMarkDown4, "\
          "Max(MarkDown5) as MaxMarkDown5, Min(MarkDown5) as MinMarkDown5, "\
          "Max(CPI) as MaxCPI, Min(CPI) as MinCPI, "\
          "Max(Unemployment) as MaxUnemp, Min(Unemployment) as MinUnemp "\
          "FROM features").show()

+-------+-------+-------+-------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-----------+------+--------+--------+
|MaxTemp|MinTemp|MaxFuel|MinFuel|MaxMarkDown1|MinMarkDown1|MaxMarkDown2|MinMarkDown2|MaxMarkDown3|MinMarkDown3|MaxMarkDown4|MinMarkDown4|MaxMarkDown5|MinMarkDown5|     MaxCPI|MinCPI|MaxUnemp|MinUnemp|
+-------+-------+-------+-------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-----------+------+--------+--------+
| 101.95|  -7.29|  4.468|  2.472|   103184.98|    -2781.45|   104519.54|     -265.76|   149483.31|     -179.26|    67474.85|         0.0|    771448.1|     -185.17|228.9764563|   0.0|  14.313|     0.0|
+-------+-------+-------+-------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-----------+------+--------+-----

Agrupar y contar por categoría:

Tamaño y tipo de tienda:

In [51]:
stores.groupby('Type')\
    .agg(F.sum('Size').alias('TotalSize'))\
    .sort(F.col('Type').asc())\
    .show()

+----+---------+
|Type|TotalSize|
+----+---------+
|   A|  3899450|
|   B|  1720242|
|   C|   243250|
+----+---------+



Agrupar por tipo y tamaño de tieda, utilizando SQL:

In [46]:
stores.createOrReplaceTempView("stores")
spark.sql("SELECT Type, sum(Size) as TotalSize "\
          "FROM stores "\
          "GROUP BY Type "\
          "ORDER BY Type").show()

+----+---------+
|Type|TotalSize|
+----+---------+
|   A|  3899450|
|   B|  1720242|
|   C|   243250|
+----+---------+



¿Cuántos registros de "features" existen por cada tienda?

In [55]:
features.groupby('Store')\
    .agg(F.count('Store').alias('Count'))\
    .sort(F.col('Store').asc())\
    .show()

+-----+-----+
|Store|Count|
+-----+-----+
|    1|  182|
|    2|  182|
|    3|  182|
|    4|  182|
|    5|  182|
|    6|  182|
|    7|  182|
|    8|  182|
|    9|  182|
|   10|  182|
|   11|  182|
|   12|  182|
|   13|  182|
|   14|  182|
|   15|  182|
|   16|  182|
|   17|  182|
|   18|  182|
|   19|  182|
|   20|  182|
+-----+-----+
only showing top 20 rows



Características promedio de cada tieda:

In [86]:
features.groupby('Store')\
    .agg(F.avg('Temperature').alias('Average Temp'),\
         F.avg('Fuel_Price').alias('Average Fuel Price'),\
         F.avg('CPI').alias('Average CPI'),\
         F.avg('Unemployment').alias('Average Unemployment Rate'))\
    .sort(F.col('Store').asc())\
    .show()
    

+-----+-----------------+------------------+------------------+-------------------------+
|Store|     Average Temp|Average Fuel Price|       Average CPI|Average Unemployment Rate|
+-----+-----------------+------------------+------------------+-------------------------+
|    1|66.91203296703297|3.2592417582417585|201.75227351483505|         6.90949450549452|
|    2|66.72840659340655|3.2592417582417585|201.42445115769218|        6.875104395604414|
|    3|70.39417582417582|3.2592417582417585| 204.9265421626373|         6.50557692307692|
|    4|61.41664835164835|3.2548846153846167| 119.9687277516483|        5.244060439560446|
|    5|68.22450549450548|3.2592417582417585| 202.2840453252746|        5.722939560439558|
|    6|68.50467032967035|3.2592417582417585|203.20754767802205|       5.9545274725274755|
|    7|37.92126373626373|3.2944010989010994|180.81014821538466|         7.78008791208791|
|    8|61.18021978021979|3.2592417582417585|204.97095387582425|        5.522538461538464|
|    9|66.

Características promedio de cada tieda usando SQL:

In [80]:
features.createOrReplaceTempView("features") 
spark.sql("SELECT Store, avg(Temperature) as AvgTemp, "\
          "avg(Fuel_Price) as AvgFuelPrice, "\
          "avg(CPI) as AvgCPI, "\
          "avg(Unemployment) as AvgUnemploymentRate "\
          "FROM features "\
          "GROUP BY Store "\
          "ORDER BY Store").show()

+-----+-----------------+------------------+------------------+-------------------+
|Store|          AvgTemp|      AvgFuelPrice|            AvgCPI|AvgUnemploymentRate|
+-----+-----------------+------------------+------------------+-------------------+
|    1|66.91203296703297|3.2592417582417585|201.75227351483505|   6.90949450549452|
|    2|66.72840659340655|3.2592417582417585|201.42445115769218|  6.875104395604414|
|    3|70.39417582417582|3.2592417582417585| 204.9265421626373|   6.50557692307692|
|    4|61.41664835164835|3.2548846153846167| 119.9687277516483|  5.244060439560446|
|    5|68.22450549450548|3.2592417582417585| 202.2840453252746|  5.722939560439558|
|    6|68.50467032967035|3.2592417582417585|203.20754767802205| 5.9545274725274755|
|    7|37.92126373626373|3.2944010989010994|180.81014821538466|   7.78008791208791|
|    8|61.18021978021979|3.2592417582417585|204.97095387582425|  5.522538461538464|
|    9|66.26950549450548|3.2592417582417585| 205.1464342895604|  5.505923076

Registro histórico de ventas, que incluye el tipo de tienda y la temperatura en ese día.

In [164]:
overallSales = sales.alias('s')\
    .join(stores.alias('t'),['store'])\
    .join(features.alias('f'),\
          (sales['Store'] == features['Store']) &\
          (sales['Date'] == features['Date']))\
    .select(['s.Store', 't.Type', 's.Date',\
             's.Weekly_Sales', 'f.Temperature'])\
    .sort(F.col('Store').asc(), F.col('Date').asc())

Ventas por tieda:

In [165]:
overallSales.groupby(['Store', 'Date'])\
    .agg(F.sum('Weekly_Sales').alias('Weekly_Sales'),
         F.avg('Temperature').alias('avgTemp'))\
    .sort(F.col('Store').asc(), F.col('Date').asc())\
    .show()

+-----+----------+------------------+------------------+
|Store|      Date|      Weekly_Sales|           avgTemp|
+-----+----------+------------------+------------------+
|    1|2010-02-05|         1643690.9| 42.30999999999996|
|    1|2010-02-12|1641957.4399999997|38.510000000000055|
|    1|2010-02-19|        1611968.17|39.929999999999964|
|    1|2010-02-26|1409727.5900000003| 46.63000000000008|
|    1|2010-03-05|        1554806.68|              46.5|
|    1|2010-03-12|1439541.5900000005| 57.78999999999998|
|    1|2010-03-19|1472515.7899999998|54.579999999999956|
|    1|2010-03-26|1404429.9200000004| 51.44999999999994|
|    1|2010-04-02|1594968.2799999998| 62.27000000000003|
|    1|2010-04-09|1545418.5300000005| 65.85999999999999|
|    1|2010-04-16|        1466058.28| 66.32000000000001|
|    1|2010-04-23|        1391256.12| 64.84000000000007|
|    1|2010-04-30|1425100.7100000002| 67.40999999999993|
|    1|2010-05-07|1603955.1200000003|  72.5500000000001|
|    1|2010-05-14|1494251.50000

Por tipo de tienda:

In [218]:
overallSales.groupby(['Type', 'Date'])\
    .agg(F.sum('Weekly_Sales').alias('Weekly_Sales'),\
         F.avg('Temperature').alias('avgTemp'))\
    .sort(F.col('Type').asc(), F.col('Date').asc())\
    .show()

+----+----------+--------------------+------------------+
|Type|      Date|        Weekly_Sales|           avgTemp|
+----+----------+--------------------+------------------+
|   A|2010-02-05|3.2144126249999996E7|  34.0467724867725|
|   A|2010-02-12| 3.098257078999999E7|33.285416666666656|
|   A|2010-02-19|3.1000072759999998E7|  36.6127452271231|
|   A|2010-02-26|       2.800899599E7|38.864681274900406|
|   A|2010-03-05|        3.00205436E7| 42.38552317880796|
|   A|2010-03-12|        2.93775864E7|47.784394946808526|
|   A|2010-03-19|2.8748575600000005E7| 49.75015883520847|
|   A|2010-03-26|2.7872908220000003E7|47.665009980039905|
|   A|2010-04-02|3.2314949919999998E7| 53.09268260292165|
|   A|2010-04-09|3.0316177629999995E7| 58.61314777998673|
|   A|2010-04-16|       2.893962379E7|57.722824783477655|
|   A|2010-04-23|       2.846722192E7| 56.68800932090546|
|   A|2010-04-30|2.7959211279999997E7|57.601442885771554|
|   A|2010-05-07|        3.13582192E7| 64.16730053191488|
|   A|2010-05-

Registro histórico de ventas, que incluye el tipo de tienda y la temperatura en ese día, utilizando SQL:

In [None]:
features.createOrReplaceTempView("features")
stores.createOrReplaceTempView("stores")
sales.createOrReplaceTempView("sales")

spark.sql("SELECT s.Store, s.Date, sum(s.Weekly_Sales) as Weekly_Sales, avg(f.Temperature) as avtTemp "\
          "FROM sales s "\
          "JOIN stores t ON s.Store = t.Store "\
          "JOIN features f on s.Store = f.Store AND s.Date = f.Date "\
          "GROUP BY s.Store, s.Date "\
          "ORDER BY s.Store, s.Date").show()

Obtener las ventas de cuando es día festivo o no:

In [186]:
sales.groupby('isHoliday')\
    .agg(F.count('isHoliday').alias('Holidays'),\
         F.avg('Weekly_Sales').alias('avgSales'),\
         F.sum('Weekly_Sales').alias('sumSales'))\
    .show()

+---------+--------+------------------+-------------------+
|isHoliday|Holidays|          avgSales|           sumSales|
+---------+--------+------------------+-------------------+
|     true|   29661| 17035.82318735042|5.052995515600008E8|
|    false|  391909|15901.445069008514|6.231919435550057E9|
+---------+--------+------------------+-------------------+



Obtener las ventas de cuando es día festivo o no, utilizando SQL:

In [184]:
sales.createOrReplaceTempView("sales") 
spark.sql("SELECT s.IsHoliday, "\
          "COUNT(s.IsHoliday) as Holidays, "\
          "AVG(s.Weekly_Sales) as avgSales, "\
          "SUM(s.Weekly_Sales) as sumSales "\
          "FROM sales s "\
          "GROUP BY s.IsHoliday").show()

+---------+--------+------------------+-------------------+
|IsHoliday|Holidays|          avgSales|           sumSales|
+---------+--------+------------------+-------------------+
|     true|   29661| 17035.82318735042|5.052995515600008E8|
|    false|  391909|15901.445069008514|6.231919435550057E9|
+---------+--------+------------------+-------------------+



Cómo evolucionan las ventas a lo largo del año:

In [223]:
sales.select(F.date_format('Date', 'yyyy-MM')\
             .alias('yearmonth'),\
             'Weekly_Sales')\
     .groupby('yearmonth')\
     .agg(F.sum('Weekly_Sales').alias('Sales'))\
     .sort(F.col('yearmonth').asc())\
     .show()

+---------+--------------------+
|yearmonth|               Sales|
+---------+--------------------+
|  2010-02|1.9033298303999978E8|
|  2010-03|1.8191980249999988E8|
|  2010-04|2.3141236804999948E8|
|  2010-05|1.8671093434000015E8|
|  2010-06| 1.922461723600002E8|
|  2010-07|2.3258012597999993E8|
|  2010-08|1.8764011088999993E8|
|  2010-09|1.7726789637000018E8|
|  2010-10|2.1716182402000016E8|
|  2010-11|2.0285337014000016E8|
|  2010-12|2.8876053272000027E8|
|  2011-01| 1.637039668299999E8|
|  2011-02|1.8633132787000036E8|
|  2011-03|1.7935644829000002E8|
|  2011-04|2.2652651096999988E8|
|  2011-05|1.8164815816000035E8|
|  2011-06|      1.8977338519E8|
|  2011-07|      2.2991139887E8|
|  2011-08| 1.885993322500002E8|
|  2011-09|2.2084773841999996E8|
+---------+--------------------+
only showing top 20 rows



Cómo evolucionan las ventas a lo largo del año, utilizando SQL:

In [228]:
sales.createOrReplaceTempView("sales") 
spark.sql("SELECT date_format(s.date, 'y-MM') AS yearmonth, "\
          "sum(s.weekly_sales) AS sales "\
          "FROM sales s "\
          "GROUP BY date_format(s.date, 'y-MM') "\
          "ORDER BY date_format(s.date, 'y-MM')").show()

+---------+--------------------+
|yearmonth|               sales|
+---------+--------------------+
|  2010-02| 1.903329830399992E8|
|  2010-03|1.8191980249999988E8|
|  2010-04|2.3141236804999968E8|
|  2010-05|1.8671093434000012E8|
|  2010-06|1.9224617235999978E8|
|  2010-07| 2.325801259799998E8|
|  2010-08|1.8764011088999933E8|
|  2010-09| 1.772678963699998E8|
|  2010-10|2.1716182401999965E8|
|  2010-11| 2.028533701399999E8|
|  2010-12| 2.887605327199993E8|
|  2011-01|1.6370396682999998E8|
|  2011-02|1.8633132787000024E8|
|  2011-03|1.7935644828999972E8|
|  2011-04| 2.265265109699991E8|
|  2011-05|1.8164815816000038E8|
|  2011-06|      1.8977338519E8|
|  2011-07|2.2991139886999977E8|
|  2011-08|1.8859933225000036E8|
|  2011-09|2.2084773841999993E8|
+---------+--------------------+
only showing top 20 rows

