# Exploitation Zone - Model Predictiu

- exploitation zone del model predictiu
- preparation pipeline per taula d'entrenament del model --> cada zipcode és un indiv
    - Sales: 5 categories més comunes per zipcode, count vendes per zipcode, profit mitja per zipcode, mitjana num unitat per comanda per zipcode
    - Shops: 5shops més comunes per zipcode
    - Income: mitjana income per zipcode

In [1]:
#!pip install pyspark
#!pip install delta-spark

import pyspark
from delta import *

#!wget -O "HR_comma_sep.csv" "https://mydisk.cs.upc.edu/s/3o33yciBHADiFCD/download/HR_comma_sep.csv"

builder = pyspark.sql.SparkSession.builder.appName("Shops_Deltalake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
#Arxiu Parquet (de moment suposem aixo despres arreglem amb duckdb)
shops = spark.read.parquet("./datalake/shops_data/2024-04-24_shops_data.parquet")
income = spark.read.parquet("./datalake/income_data/2024-04-24_IRSIncomeByZipCode_NoStateTotalsNoSmallZips.parquet")
sales = spark.read.parquet("./datalake/sales_data/2024-04-24_SuperstoreSalesTraining.parquet")

In [4]:
############
## INCOME ##
############
from pyspark.sql.functions import col

# Lista de todos los caracteres inválidos que quieres reemplazar o eliminar
invalid_chars = [' ', ';', '{', '}', '(', ')', '\n', '\t', '=']

# Función para limpiar los nombres de las columnas reemplazando los caracteres no válidos
def clean_column_name(column_name):
    for invalid_char in invalid_chars:
        column_name = column_name.replace(invalid_char, "_")  # Reemplaza por subrayado o cualquier otro caracter válido que prefieras
    return column_name

# Aplicar la función de limpieza a cada columna
cleaned_income = income.select([col(c).alias(clean_column_name(c)) for c in income.columns])

# seleccionem files que ens interessen per MODEL PREDICTIU
income_selected = cleaned_income.select("ZIPCODE", "Total_income_amount")

In [5]:
###########
## SALES ##
###########
from pyspark.sql.functions import col

# Lista de todos los caracteres inválidos que quieres reemplazar o eliminar
invalid_chars = [' ', ';', '{', '}', '(', ')', '\n', '\t', '=']

# Función para limpiar los nombres de las columnas reemplazando los caracteres no válidos
def clean_column_name(column_name):
    for invalid_char in invalid_chars:
        column_name = column_name.replace(invalid_char, "_")  # Reemplaza por subrayado o cualquier otro caracter válido que prefieras
    return column_name

# Aplicar la función de limpieza a cada columna
cleaned_sales = sales.select([col(c).alias(clean_column_name(c)) for c in sales.columns])

# filtrar EEUU
sales_usa = cleaned_sales.filter(col("Country_/_Region") == "United States of America")

# eliminar row
sales_usa = sales_usa.dropDuplicates(subset=[col for col in sales_usa.columns if col != "row"])

# eliminar customer_name
sales_usa = sales_usa.drop("Customer_Name")

# no fa falta fer imputació de missings perque quan filtrem per USA no ens queden columnes amb missings

# eliminar missings a postal_code
sales_usa = sales_usa.dropna(subset=["Postal_Code"])

# eliminar missings a subregions
sales_usa = sales_usa.dropna(subset=["SubRegion"])

# eliminar files que missing a totes les columnes
sales_usa = sales_usa.dropna(how="all")


# seleccionem files que ens interessen per MODEL PREDICTIU
sales_selected = sales_usa.select("Postal_Code", "Category", "Sales", "Order_Quantity")

In [6]:
###########
## SHOPS ##
###########
from pyspark.sql.functions import col

# canviem nom columnes perque no ens deixa accedir-hi si tenen caracters especials
# geometry.x
new_column_name = "geometry_x"
old_column_name = "geometry.x"
shops = shops.withColumnRenamed(old_column_name, new_column_name)

# geometry.y
new_column_name = "geometry_y"
old_column_name = "geometry.y"
shops = shops.withColumnRenamed(old_column_name, new_column_name)

# attributes.shop
new_column_name = "shop"
old_column_name = "attributes.shop"
shops = shops.withColumnRenamed(old_column_name, new_column_name)

# attributes.name
new_column_name = "name"
old_column_name = "attributes.name"
shops = shops.withColumnRenamed(old_column_name, new_column_name)

# attributes.osm_id2
new_column_name = "index"
old_column_name = "attributes.osm_id2"
shops = shops.withColumnRenamed(old_column_name, new_column_name)

# attributes.addr_postcode
new_column_name = "postcode"
old_column_name = "attributes.addr_postcode"
shops = shops.withColumnRenamed(old_column_name, new_column_name)

# ens quedem només amb columnes seleccionades
selected_columns = ['geometry_x', 'geometry_y', "shop", "name", "index", "postcode"]
shops_selected = shops.select(selected_columns)

# eliminar files que tinguin missings --> excepte les que tenen missings a postcode!!
shops_selected = shops_selected.filter(~(col("geometry_x").isNull() |
                                col("geometry_y").isNull() |
                                col("shop").isNull() |
                                col("name").isNull() |
                                col("index").isNull()))

# seleccionem files que ens interessen per MODEL PREDICTIU
shops_selected = shops.select("shop", "postcode")

In [8]:
# creem dataframe buit amb les columnes que volem que tingui
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# definim estructura
schema = StructType([
    StructField("zipcode", StringType(), True),
    StructField("avg_income_per_zipcode", DoubleType(), True),
    StructField("shop1", StringType(), True),
    StructField("shop2", StringType(), True),
    StructField("shop3", StringType(), True),
    StructField("shop4", StringType(), True),
    StructField("shop5", StringType(), True),
    StructField("sales_cat1", StringType(), True),
    StructField("sales_cat2", StringType(), True),
    StructField("sales_cat3", StringType(), True),
    StructField("sales_cat4", StringType(), True),
    StructField("sales_cat5", StringType(), True),
    StructField("sales_per_zipcode", IntegerType(), True),
    StructField("avg_profit_per_zipcode", DoubleType(), True),
    StructField("avg_order_quantity_per_zipcode", DoubleType(), True)
])

# crear dataframe buit
df_model = spark.createDataFrame([], schema)

df_model.show()


+-------+----------------------+-----+-----+-----+-----+-----+----------+----------+----------+----------+----------+-----------------+----------------------+------------------------------+
|zipcode|avg_income_per_zipcode|shop1|shop2|shop3|shop4|shop5|sales_cat1|sales_cat2|sales_cat3|sales_cat4|sales_cat5|sales_per_zipcode|avg_profit_per_zipcode|avg_order_quantity_per_zipcode|
+-------+----------------------+-----+-----+-----+-----+-----+----------+----------+----------+----------+----------+-----------------+----------------------+------------------------------+
+-------+----------------------+-----+-----+-----+-----+-----+----------+----------+----------+----------+----------+-----------------+----------------------+------------------------------+



In [7]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

shops_selected = shops_selected.withColumnRenamed("postcode", "zipcode")

# Definir una ventana sobre la cual aplicar row_number()
window = Window.partitionBy("zipcode").orderBy(F.desc("count"))

# Calcular la cantidad de cada tipo de tienda para cada código postal
top_shops = (
    shops_selected.groupBy("zipcode", "shop")
         .count()
         .withColumn("row_num", F.row_number().over(window))
         .filter(F.col("row_num") <= 5)
         .select("zipcode", "shop", "count")
         .orderBy("zipcode", "row_num")
)

# Mostrar los resultados
top_shops.show()

+-------+------------+-----+
|zipcode|        shop|count|
+-------+------------+-----+
|   NULL| convenience|  327|
|   NULL|     clothes|  162|
|   NULL| supermarket|  102|
|   NULL|        gift|   64|
|   NULL|         yes|   48|
|  28000|     florist|    1|
|  28000|    copyshop|    1|
|  28000| supermarket|    1|
|  28000|    boutique|    1|
|  28000|dry_cleaning|    1|
|  28017|     clothes|    4|
|  28017| hairdresser|    3|
|  28017|      beauty|    3|
|  28017| convenience|    3|
|  28017|       paint|    2|
|  28020|  car_repair|    1|
|  28035|       shoes|    2|
|  28035|  stationery|    2|
|  28035|     laundry|    2|
|  28035|       paint|    2|
+-------+------------+-----+
only showing top 20 rows



In [9]:
# taula a partir de income
df_model = income_selected

# renombrem columnes
df_model = df_model.withColumnRenamed("ZIPCODE", "zipcode")
df_model = df_model.withColumnRenamed("Total_income_amount", "avg_income_per_zipcode")

# renombrem columnes de zipcode a sales i shops per poder fer join
shops_selected = shops_selected.withColumnRenamed("postcode", "zipcode")
sales_selected = sales_selected.withColumnRenamed("Postal_Code", "zipcode")

df_model.show()

+-------+----------------------+
|zipcode|avg_income_per_zipcode|
+-------+----------------------+
|  35004|                258024|
|  35005|                129390|
|  35006|                 58585|
|  35007|                651350|
|  35010|                382106|
|  35014|                 67885|
|  35016|                333226|
|  35019|                 35392|
|  35020|                262475|
|  35022|                521539|
|  35023|                480458|
|  35031|                112152|
|  35033|                 67437|
|  35034|                 52030|
|  35035|                 31542|
|  35040|                359868|
|  35042|                 96503|
|  35043|                363943|
|  35044|                124406|
|  35045|                236772|
+-------+----------------------+
only showing top 20 rows



In [12]:
shops_selected.show()

+-------------+-------+
|         shop|zipcode|
+-------------+-------+
|confectionery|   NULL|
|      seafood|   NULL|
|  supermarket|   NULL|
|      laundry|   NULL|
|      clothes|   NULL|
|    cosmetics|   NULL|
|  convenience|   NULL|
|      clothes|   NULL|
|      clothes|   NULL|
|          art|   NULL|
|      outdoor|   NULL|
|    chocolate|   NULL|
|       tattoo|   NULL|
|  supermarket|   NULL|
|      laundry|   NULL|
|        books|   NULL|
|     boutique|  96716|
|       bakery|   NULL|
|  supermarket|   NULL|
|      outdoor|   NULL|
+-------------+-------+
only showing top 20 rows



In [13]:
shops_cleaned = shops.dropna()
shops_cleaned.show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `attributes`.`objectid` cannot be resolved. Did you mean one of the following? [`attributes`.`objectid`, `index`, `attributes`.`abandoned`, `attributes`.`addr_housename`, `attributes`.`addr_housenumber`, `attributes`.`addr_street`, `attributes`.`addr_city`, `attributes`.`addr_state`, `postcode`, `attributes`.`addr_province`, `attributes`.`addr_country`, `attributes`.`addr_district`, `attributes`.`addr_subdistrict`, `attributes`.`addr_unit`, `attributes`.`amenity`, `attributes`.`brand`, `attributes`.`building`, `name`, `attributes`.`operator`, `shop`, `geometry_x`, `geometry_y`].

In [33]:
# taula a partir de income
df_model = income_selected

# renombrem columnes
df_model = df_model.withColumnRenamed("ZIPCODE", "zipcode")
df_model = df_model.withColumnRenamed("Total_income_amount", "avg_income_per_zipcode")

# renombrem columnes de zipcode a sales i shops per poder fer join
shops_selected = shops_selected.withColumnRenamed("postcode", "zipcode")
sales_selected = sales_selected.withColumnRenamed("Postal_Code", "zipcode")

# modificar taula shops per que tingui les columnes que volem --> shop1, shop2, shop3, shop4, shop5


df_model = df_model.join(shops_selected, "zipcode", "left")


# modificar taula sales perque tingui les columnes que volem --> sales_cat1, sales_cat2, sales_cat3, sales_cat4, sales_cat5, sales_per_zipcode, avg_profit_per_zipcode, avg_order_quantity_per_zipcode


df_model = df_model.join(sales_selected, "zipcode", "left")



# Mostrar el DataFrame resultante
df_model.show()

+-------+----------------------+----+
|zipcode|avg_income_per_zipcode|shop|
+-------+----------------------+----+
|  35004|                258024|NULL|
|  35005|                129390|NULL|
|  35006|                 58585|NULL|
|  35007|                651350|NULL|
|  35010|                382106|NULL|
|  35014|                 67885|NULL|
|  35016|                333226|NULL|
|  35019|                 35392|NULL|
|  35020|                262475|NULL|
|  35022|                521539|NULL|
|  35023|                480458|NULL|
|  35031|                112152|NULL|
|  35033|                 67437|NULL|
|  35034|                 52030|NULL|
|  35035|                 31542|NULL|
|  35040|                359868|NULL|
|  35042|                 96503|NULL|
|  35043|                363943|NULL|
|  35044|                124406|NULL|
|  35045|                236772|NULL|
+-------+----------------------+----+
only showing top 20 rows



In [None]:
# introduim valors de zipcode
df_model = df_model.withColumn("zipcode", income_selected["ZIPCODE"])

# unim income amb df_model
df_model = df_model.join(income_selected, df_model.zipcode == income_selected.zipcode, how="left")


In [None]:
from pyspark.sql.functions import col, avg, count, desc, row_number
from pyspark.sql.window import Window

# Calcular la media de ingresos por código postal
income_avg = income_data.groupBy("zipcode").agg(avg("income").alias("avg_income"))

# Determinar las tiendas más comunes por código postal
window = Window.partitionBy("zipcode").orderBy(desc("count"))
top_shops = sales_data.groupBy("zipcode", "shop").agg(count("*").alias("count")).\
    withColumn("rn", row_number().over(window)).filter(col("rn") <= 5)

# Calcular métricas de ventas por código postal y tienda
sales_metrics = sales_data.groupBy("zipcode", "shop", "category").\
    agg(count("*").alias("count"), avg("sales").alias("avg_sales"), avg("profit").alias("avg_profit"),
        avg("num_units").alias("avg_num_units"))

# Combinar todas las métricas en una sola tabla
result_table = sales_metrics.join(income_avg, "zipcode", "left").\
    join(top_shops, ["zipcode", "shop"], "left")

# Mostrar el resultado
result_table.show()

# Finalizar la sesión de Spark
spark.stop()


In [None]:
# finalitzar sessió de Spark
spark.stop()