# Análisis de Grandes Volúmenes de Datos
## Proyecto BigData PySpark
### Evidencia 1 - Particionamiento

Integrantes:

+ Jorge Barón Bracho - A01422588
+ Elda Cristina Morales Sánchez de la Barquera - A00449074
+ Eduardo Selim Martínez Mayorga - A01795167
+ José Arturo Valdivia Rivera - A01795395

In [1]:
#Descargamos las librerias necesarias, iniciamos sesion de Pyspark y cargamos la base de datos
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import countDistinct, count, isnan, when, desc
from pyspark.sql.functions import col, expr, mean, stddev, min, max, length, ceil
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.types import NumericType
from pyspark.sql.functions import concat_ws,col
import math
import pandas as pd

spark = SparkSession.builder.appName("CSVRead").getOrCreate()
df = spark.read.csv("Iowa_Liquor_Sales.csv", header=True, inferSchema=True)

25/05/04 22:25:54 WARN Utils: Your hostname, MacBook-Air-de-Eduardo.local resolves to a loopback address: 127.0.0.1; using 192.168.100.10 instead (on interface en0)
25/05/04 22:25:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/04 22:25:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/04 22:26:08 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [3]:
# Cambiamos los nombres de las columnas para facilitar su manejo al programar
df = df.withColumnsRenamed({"Invoice/Item Number": "invoice_number",
"Date": "date",
"Store Number": "store_number",
"Store Name": "store_name",
"Address": "address",
"City": "city",
"Zip Code": "zip_code",
"Store Location": "store_location",
"County Number": "county_number",
"County": "county",
"Category": "category",
"Category Name": "category_name",
"Vendor Number": "vendor_number",
"Vendor Name": "vendor_name",
"Item Number": "item_number",
"Item Description": "item_description",
"Pack": "pack",
"Bottle Volume (ml)": "bottle_volume",
"State Bottle Cost": "bottle_cost",
"State Bottle Retail": "bottle_retail",
"Bottles Sold": "bottles_sold",
"Sale (Dollars)": "sale_dollars",
"Volume Sold (Liters)": "liters_sold",
"Volume Sold (Gallons)": "gallons_sold"})

In [4]:
# Variables de caracterización
vars_caracterizacion = ["category","vendor_number"]

# Total de filas para calcular proporciones
total = df.count()

# Agrupar por combinaciones y contar ocurrencias
df_grouped = df.groupBy(vars_caracterizacion).agg(count("*").alias("conteo"))

# Calcular probabilidad de cada combinación
df_prob = df_grouped.withColumn("probabilidad", col("conteo") / total)
# Conteo nos dice cuantas veces se repite una combinacion
# Probabilidad mustra la probabilidad que hay de que salga dicha combinación
# Mostrar ejemplo de combinaciones y sus probabilidades
df_prob = df_prob.orderBy(desc("probabilidad"))

                                                                                

In [5]:
df_prob.show(5)



+--------+-------------+------+--------------------+
|category|vendor_number|conteo|        probabilidad|
+--------+-------------+------+--------------------+
| 1081600|          421|962193|0.036779791532520936|
| 1012100|          260|828176| 0.03165699670672834|
| 1011200|           65|825949|0.031571869714801644|
| 1012100|          115|684958| 0.02618249399915867|
| 1062400|          260|449083|0.017166180922953193|
+--------+-------------+------+--------------------+
only showing top 5 rows



                                                                                

In [6]:
# Creamos la etiqueta del estrato concatenando el número de categoría y el número de vendedor con un guion de por medio
df_estrato = df_prob.select(concat_ws('_',df_prob.category,df_prob.vendor_number).alias("estrato"),
                            "category", "vendor_number", "conteo", "probabilidad")

In [7]:
df_estrato.show(5)



+-----------+--------+-------------+------+--------------------+
|    estrato|category|vendor_number|conteo|        probabilidad|
+-----------+--------+-------------+------+--------------------+
|1081600_421| 1081600|          421|962193|0.036779791532520936|
|1012100_260| 1012100|          260|828176| 0.03165699670672834|
| 1011200_65| 1011200|           65|825949|0.031571869714801644|
|1012100_115| 1012100|          115|684958| 0.02618249399915867|
|1062400_260| 1062400|          260|449083|0.017166180922953193|
+-----------+--------+-------------+------+--------------------+
only showing top 5 rows



                                                                                

In [8]:
# Creamos la etiqueta del estrato en el dataset completo
# concatenando el número de categoría y el número de vendedor con un guion de por medio
df = df.withColumn("estrato", concat_ws('_',df.category,df.vendor_number))

In [9]:
df.show(5)

+--------------+----------+------------+--------------------+--------------------+----------+--------+--------------------+-------------+-------+--------+--------------------+-------------+--------------------+-----------+--------------------+----+-------------+-----------+-------------+------------+------------+-----------+------------+-----------+
|invoice_number|      date|store_number|          store_name|             address|      city|zip_code|      store_location|county_number| county|category|       category_name|vendor_number|         vendor_name|item_number|    item_description|pack|bottle_volume|bottle_cost|bottle_retail|bottles_sold|sale_dollars|liters_sold|gallons_sold|    estrato|
+--------------+----------+------------+--------------------+--------------------+----------+--------+--------------------+-------------+-------+--------+--------------------+-------------+--------------------+-----------+--------------------+----+-------------+-----------+-------------+--------

In [10]:
# Se define la proporción de la población que se requiere muestrear
prop_poblacion = 0.10
# Se obtiene el tamaño total de la muestra
n = math.ceil(total*prop_poblacion)

En este caso, el tamaño total de la muestra es de

In [11]:
n

2616092

El tamaño de la muestra de cada estrato se calcula mediante la expresión

$$n_h = \frac{n\cdot N_h}{N},$$

donde

+ $n$ es el tamaño de la muestra total
+ $N$ es el tamaño de la población total
+ $N_h$ es el tamaño de la población en el estrato $h$
+ $n_h$ es el tamaño de la muestra del estrato $h$

In [12]:
# Se obtiene una columna con el tamaño de muestra requerido de cada estrato
df_estrato = df_estrato.withColumn("muestra", col("probabilidad") * n)
df_estrato = df_estrato.withColumn("tamanio_muestra", ceil("muestra"))
df_estrato = df_estrato.withColumn("proporcion_estrato", col("tamanio_muestra")/col("conteo"))

In [13]:
df_estrato.show(5)

                                                                                

+-----------+--------+-------------+------+--------------------+-----------------+---------------+-------------------+
|    estrato|category|vendor_number|conteo|        probabilidad|          muestra|tamanio_muestra| proporcion_estrato|
+-----------+--------+-------------+------+--------------------+-----------------+---------------+-------------------+
|1081600_421| 1081600|          421|962193|0.036779791532520936|96219.31838989576|          96220|0.10000072750477296|
|1012100_260| 1012100|          260|828176| 0.03165699670672834|82817.61582849837|          82818|0.10000048298912309|
| 1011200_65| 1011200|           65|825949|0.031571869714801644|82594.91578593486|          82595|0.10000012107285075|
|1012100_115| 1012100|          115|684958| 0.02618249399915867|68495.81309124701|          68496|0.10000029198870587|
|1062400_260| 1062400|          260|449083|0.017166180922953193|44908.30858309046|          44909|0.10000155873190479|
+-----------+--------+-------------+------+-----

In [14]:
# Se convierte el spark DataFrame en Pandas dataframe para extraer la proporción por estratos a muestrear
pandas_estratos = df_estrato.toPandas()

                                                                                

In [15]:
pandas_estratos.head()

Unnamed: 0,estrato,category,vendor_number,conteo,probabilidad,muestra,tamanio_muestra,proporcion_estrato
0,1081600_421,1081600.0,421.0,962193,0.03678,96219.31839,96220,0.100001
1,1012100_260,1012100.0,260.0,828176,0.031657,82817.615828,82818,0.1
2,1011200_65,1011200.0,65.0,825949,0.031572,82594.915786,82595,0.1
3,1012100_115,1012100.0,115.0,684958,0.026182,68495.813091,68496,0.1
4,1062400_260,1062400.0,260.0,449083,0.017166,44908.308583,44909,0.100002


In [16]:
# Se construye un diccionario con la proporción a muestrear requerida en cada estrato
proporciones = pandas_estratos.set_index('estrato')['proporcion_estrato'].to_dict()

In [17]:
# RESULTADO FINAL
# Se obtiene la muestra por cada estrato del tamaño adecuado
muestra_df = df.sampleBy(col = "estrato", fractions = proporciones, seed = 5)

In [19]:
muestra_df.show(5)

+--------------+----------+------------+--------------------+--------------------+---------------+--------+--------------------+-------------+-------+--------+--------------------+-------------+--------------------+-----------+--------------------+----+-------------+-----------+-------------+------------+------------+-----------+------------+-----------+
|invoice_number|      date|store_number|          store_name|             address|           city|zip_code|      store_location|county_number| county|category|       category_name|vendor_number|         vendor_name|item_number|    item_description|pack|bottle_volume|bottle_cost|bottle_retail|bottles_sold|sale_dollars|liters_sold|gallons_sold|    estrato|
+--------------+----------+------------+--------------------+--------------------+---------------+--------+--------------------+-------------+-------+--------+--------------------+-------------+--------------------+-----------+--------------------+----+-------------+-----------+-------

In [20]:
# Se calcula de nuevo la cantidad de elementos en cada estrato DE LA MUESTRA
# También se calcula de nuevo la probabilidad de ocurrencia pero EN LA MUESTRA
muestra_df_grouped = muestra_df.groupBy(vars_caracterizacion).agg(count("*").alias("conteo"))
muestra_df_prob = muestra_df_grouped.withColumn("probabilidad", col("conteo") / n)
muestra_df_prob = muestra_df_prob.orderBy(desc("probabilidad"))

In [21]:
muestra_df_prob.show(5)



+--------+-------------+------+--------------------+
|category|vendor_number|conteo|        probabilidad|
+--------+-------------+------+--------------------+
| 1081600|          421| 96232|  0.0367846390723262|
| 1012100|          260| 83212| 0.03180774988035589|
| 1011200|           65| 82610|  0.0315776356488992|
| 1012100|          115| 68233| 0.02608203381226654|
| 1062400|          260| 44914|0.017168356464528006|
+--------+-------------+------+--------------------+
only showing top 5 rows



                                                                                