#**Maestría en Inteligencia Artificial Aplicada**
##**Curso: Análisis de Grandes Volúmenes de Datos**
###Tecnológico de Monterrey
###Prof. Iván Olmos

## **Actividad Semana 03**

###**Proyecto: Lectura, Escritura, Archivos de Big Data PySpark**

##### Nombres y matrículas de los integrantes del equipo:
*   Victoria Melgarejo Cabrera - A01795030
*   Héctor Alejandro Alvarez Rosas        - A01796262
*   Andrea Xcaret Gomez Alfaro        - A01796384
*   Mario Guillen De La Torre       - A01796701


---


#### **Descripción de la Actividad:**

Identificar técnicas de muestreo orientadas al Big Data mediante la implementación de código en PySpark, permitiendo la construcción de una muestra representativa de la población objetivo del proyecto de investigación.

### **Base de datos utilizada**

El dataset seleccionado para el presente estudio es el Chicago Taxi Trips Dataset, una base de datos pública proporcionada por la City of Chicago's Open Data Portal. Este conjunto de datos contiene información detallada sobre los viajes realizados por taxis en la ciudad de Chicago, incluyendo datos sobre el punto de partida y llegada, duración del viaje, distancia recorrida, tarifas, y métodos de pago utilizados.
El dataset forma parte de la iniciativa de gobierno abierto de Chicago, que busca fomentar el uso de datos públicos para apoyar la investigación académica, el desarrollo de políticas públicas basadas en datos, y la innovación tecnológica en el ámbito urbano. La información está disponible públicamente y puede ser accedida a través del siguiente enlace:
https://data.cityofchicago.org/Transportation/Taxi-Trips-2024-/ajtu-isnz/about_data

---

### **Importación de Librerías**

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count, percentile_approx, lit,round
from pyspark.sql.functions import col, sum as spark_sum
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats as stats
from pyspark.sql.functions import broadcast
from pyspark.sql import functions as F
from itertools import product
import pandas as pd

### **Creación de la Sesión Spark**

In [2]:
spark = SparkSession.builder \
    .appName("ChicagoTaxyTripsAnalysis") \
     .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.python.worker.timeout", "600") \
    .config("spark.python.worker.retries", "3") \
    .getOrCreate()

In [3]:
# ID del archivo en Google Drive
filename = "Taxi_Trips__2024-__20250426.csv"

# Ruta local donde quieres guardar el archivo
local_path = f"C:/Users/mario/Maestria/Grandes Cantidades de Datos/{filename}"

# Carga de archivo CSV
df = spark.read.csv(local_path, header=True, inferSchema=True)

In [4]:
print("Número de registros:", df.count())
print("Número de columnas:", len(df.columns))

Número de registros: 7917844
Número de columnas: 23


### **Definición de Funciones**

In [5]:
## df - DataFrame, column - String with column name, percentage - percentage of values to group under Other
def percentageOfValues(df, column):
    total_count = df.count()
    df = df.groupBy(column).agg(count("*").alias("count")).withColumn("percentage",round((col("count")/lit(total_count)) * 100,2))
    return df

def replaceRareValues(df, column, percentage):
    
    total_count = df.count()
    value_counts = (
        df.groupBy(column)
        .agg(count("*").alias("count"))
        .withColumn("percentage", (col("count") / lit(total_count)) * 100)
    )
    frequent = value_counts.filter(col("percentage") >= percentage).select(column)

    df_modified = df.join(broadcast(frequent).withColumnRenamed(column, "keep_val"), on=df[column] == col("keep_val"), how="left") \
           .withColumn(column, when(col("keep_val").isNotNull(), col(column)).otherwise(lit("Other"))) \
           .drop("keep_val")

    return df_modified


# Función para eliminar los valores atípicos de un DataFrame y modificarlo
def count_outliers(df, column):
    # Calcular los percentiles Q1 y Q3 para la columna
    percentiles = df.approxQuantile(column, [0.25, 0.75], 0.05)  # 5% de error en el cálculo
    Q1 = percentiles[0]
    Q3 = percentiles[1]

    # Calcular el IQR
    IQR = Q3 - Q1

    # Definir los límites inferior y superior para los valores atípicos
    lower_limit = Q1 - 1.5 * IQR
    upper_limit = Q3 + 1.5 * IQR

    return df.filter((col(column) < lower_limit) & (col(column) > upper_limit)).count()

def show_outliers(df, column):
    # Calcular los percentiles Q1 y Q3 para la columna
    percentiles = df.approxQuantile(column, [0.25, 0.75], 0.05)  # 5% de error en el cálculo
    Q1 = percentiles[0]
    Q3 = percentiles[1]

    # Calcular el IQR
    IQR = Q3 - Q1

    # Definir los límites inferior y superior para los valores atípicos
    lower_limit = Q1 - 1.5 * IQR
    upper_limit = Q3 + 1.5 * IQR

    return df.filter((col(column) < lower_limit) & (col(column) > upper_limit)).show()

# Función para eliminar los valores atípicos de un DataFrame y modificarlo
def remove_outliers_inplace(df, column):
    # Calcular los percentiles Q1 y Q3 para la columna
    percentiles = df.approxQuantile(column, [0.25, 0.75], 0.05)  # 5% de error en el cálculo
    Q1 = percentiles[0]
    Q3 = percentiles[1]

    # Calcular el IQR
    IQR = Q3 - Q1

    # Definir los límites inferior y superior para los valores atípicos
    lower_limit = Q1 - 1.5 * IQR
    upper_limit = Q3 + 1.5 * IQR

    # Filtrar el DataFrame para eliminar los valores atípicos y modificar el DataFrame original
    df = df.filter((col(column) >= lower_limit) & (col(column) <= upper_limit))

    return df

def categorical_summary(df, column):
    # Total count of non-null values
    count = df.filter(F.col(column).isNotNull()).count()

    # Count of distinct values
    unique = df.select(column).distinct().count()

    # Most frequent value and its frequency
    top_row = (df.groupBy(column)
                 .agg(F.count("*").alias("freq"))
                 .orderBy(F.desc("freq"))
                 .first())

    top = top_row[column] if top_row else None
    freq = top_row["freq"] if top_row else 0

    # Create a summary dictionary
    summary = {
        "count": count,
        "unique": unique,
        "top": top,
        "freq": freq
    }

    return summary

### **Análisis y Transformación de Variables Características**

In [6]:
total_count = df.count()

In [None]:
categorical_summary(df,"Dropoff Community Area")

In [7]:
percentageOfValues(df, "Dropoff Community Area").show()
percentageOfValues(replaceRareValues(df, "Dropoff Community Area",5), "Dropoff Community Area").show()
percentageOfValues(replaceRareValues(df, "Dropoff Community Area",5), "Dropoff Community Area").toPandas().to_csv("C:/Users/mario/Maestria/Grandes Cantidades de Datos/VariablesCaracteristicas/DropoffCommunityAreaGrouped.csv", index=False)
dfTransformed = replaceRareValues(df, "Dropoff Community Area",5)

+----------------------+------+----------+
|Dropoff Community Area| count|percentage|
+----------------------+------+----------+
|                    31| 23632|       0.3|
|                    65|  4253|      0.05|
|                    53| 18630|      0.24|
|                    34| 31027|      0.39|
|                    28|742125|      9.37|
|                    76|409654|      5.17|
|                    27| 11710|      0.15|
|                    26|  5011|      0.06|
|                    44| 47285|       0.6|
|                    12| 14106|      0.18|
|                    22| 83931|      1.06|
|                    47|  3380|      0.04|
|                  NULL|742953|      9.38|
|                     1| 57171|      0.72|
|                    52|  2987|      0.04|
|                    13| 17030|      0.22|
|                     6|342680|      4.33|
|                    16| 52925|      0.67|
|                     3|148623|      1.88|
|                    40| 13365|      0.17|
+----------

In [None]:
categorical_summary(df,"Pickup Community Area")

In [8]:
percentageOfValues(df, "Pickup Community Area").show()
percentageOfValues(replaceRareValues(df, "Pickup Community Area",5), "Pickup Community Area").show()
percentageOfValues(replaceRareValues(df, "Pickup Community Area",5), "Pickup Community Area").toPandas().to_csv("C:/Users/mario/Maestria/Grandes Cantidades de Datos/VariablesCaracteristicas/PickupCommunityAreaGrouped.csv", index=False)
dfTransformed = replaceRareValues(dfTransformed, "Pickup Community Area",5)

+---------------------+-------+----------+
|Pickup Community Area|  count|percentage|
+---------------------+-------+----------+
|                   31|  11065|      0.14|
|                   65|   3004|      0.04|
|                   53|  25630|      0.32|
|                   34|  20178|      0.25|
|                   28| 752297|       9.5|
|                   76|1681489|     21.24|
|                   26|   3625|      0.05|
|                   27|   7150|      0.09|
|                   44|  45537|      0.58|
|                   12|  11949|      0.15|
|                   22|  34547|      0.44|
|                   47|   2907|      0.04|
|                 NULL| 226777|      2.86|
|                    1|  44780|      0.57|
|                   52|   5595|      0.07|
|                   13|  11117|      0.14|
|                    6| 251015|      3.17|
|                   16|  35522|      0.45|
|                    3| 125293|      1.58|
|                   40|  12125|      0.15|
+----------

In [None]:
categorical_summary(df,"Payment Type")

In [9]:
percentageOfValues(df, "Payment Type").show()
percentageOfValues(df.withColumn("payment_group",when(col("Payment Type") == "Credit Card", "Credit Card").when(col("Payment Type") == "Cash", "Cash").when(col("Payment Type") == "Mobile","Mobile").otherwise("Otro")), "payment_group").show()
percentageOfValues(df.withColumn("payment_group",when(col("Payment Type") == "Credit Card", "Credit Card").when(col("Payment Type") == "Cash", "Cash").when(col("Payment Type") == "Mobile","Mobile").otherwise("Otro")), "payment_group").toPandas().to_csv("C:/Users/mario/Maestria/Grandes Cantidades de Datos/VariablesCaracteristicas/PaymentTypeGrouped.csv", index=False)
dfTransformed = dfTransformed.withColumn("payment_group",when(col("Payment Type") == "Credit Card", "Credit Card").when(col("Payment Type") == "Cash", "Cash").when(col("Payment Type") == "Mobile","Mobile").otherwise("Otro"))

+------------+-------+----------+
|Payment Type|  count|percentage|
+------------+-------+----------+
| Credit Card|3066154|     38.72|
|   No Charge|  21298|      0.27|
|     Unknown| 321508|      4.06|
|      Prcard| 951509|     12.02|
|        Cash|2213895|     27.96|
|     Dispute|   4709|      0.06|
|      Mobile|1338767|     16.91|
|     Prepaid|      4|       0.0|
+------------+-------+----------+

+-------------+-------+----------+
|payment_group|  count|percentage|
+-------------+-------+----------+
|         Otro|1299028|     16.41|
|  Credit Card|3066154|     38.72|
|         Cash|2213895|     27.96|
|       Mobile|1338767|     16.91|
+-------------+-------+----------+



In [None]:
categorical_summary(df,"company")

In [10]:
percentageOfValues(df, "company").show()
percentageOfValues(replaceRareValues(df, "company",5), "company").show()
percentageOfValues(replaceRareValues(df, "company",5), "company").toPandas().to_csv("C:/Users/mario/Maestria/Grandes Cantidades de Datos/VariablesCaracteristicas/companyGrouped.csv", index=False)
dfTransformed = replaceRareValues(dfTransformed, "company",5)

+--------------------+-------+----------+
|             company|  count|percentage|
+--------------------+-------+----------+
|3556 - 36214 RC A...|   2051|      0.03|
|     Chicago Taxicab|  16466|      0.21|
|4053 - 40193 Adwa...|   1064|      0.01|
|Taxi Affiliation ...|1354304|      17.1|
|             Top Cab|  23427|       0.3|
|         5 Star Taxi| 366649|      4.63|
|   Metro Jet Taxi A.|   3598|      0.05|
|           U Taxicab|  27154|      0.34|
|           Flash Cab|1687588|     21.31|
|Choice Taxi Assoc...|  91353|      1.15|
|3591 - 63480 Chuk...|   5009|      0.06|
|5167 - 71969 5167...|   3375|      0.04|
|Chicago City Taxi...|  74478|      0.94|
|          Setare Inc|   6846|      0.09|
|            Sun Taxi| 866893|     10.95|
|Blue Ribbon Taxi ...| 304554|      3.85|
|Patriot Taxi Dba ...|   9888|      0.12|
|6574 - Babylon Ex...|   1825|      0.02|
|     Petani Cab Corp|   1031|      0.01|
|    Medallion Leasin| 206500|      2.61|
+--------------------+-------+----

### **Creación de reglas de particionamiento**

#### **Asumiendo Dependencia**

In [None]:
grouped = dfTransformed.groupBy("Pickup Community Area", "Dropoff Community Area","payment_group","company").count()
total_count = dfTransformed.count()

In [None]:
grouped_with_probs = grouped.withColumn("probability", F.col("count") / total_count)

In [None]:
grouped_with_probs.show()

In [None]:
grouped_with_probs.toPandas().to_csv("C:/Users/mario/Maestria/Grandes Cantidades de Datos/VariablesCaracteristicas/generalGrouped.csv", index=False)

#### **Asumiendo Independencia**

In [11]:
columnsToCheck = ["Pickup Community Area", "Dropoff Community Area","payment_group","company"]
values = []
total_count = dfTransformed.count()
for i in range(0,len(columnsToCheck)):
    result_df = (
        dfTransformed.groupBy(columnsToCheck[i])
        .agg(count("*").alias("count"))
        .withColumn("percentage", round((col("count") / lit(total_count)) * 100, 2))
        .select(columnsToCheck[i], "percentage")
    )
    array_2d = [list(row) for row in result_df.collect()]
    
    values.append(array_2d)

In [17]:
dicts = []
for col_vals in values:
    d = {val: pct / 100.0 for val, pct in col_vals}  # Normalize percentages
    dicts.append(d)
    
# Assuming `dicts` is a list of dictionaries for each column
keys = [list(d.keys()) for d in dicts]  # All values for each column
combinations = list(product(*keys))  # Cartesian product to generate all combinations

# Calculate joint probabilities
results = []
for combo in combinations:
    prob = 1.0
    for i, val in enumerate(combo):
        prob *= dicts[i][val]  # Multiply probabilities

   # Create a dictionary with column names and their corresponding values along with the probability
    results.append({**{f"Column{i+1}": combo[i] for i in range(len(combo))}, "probability": prob})

# Convert the results to a pandas DataFrame
df_result = pd.DataFrame(results)

# Display the DataFrame
print(df_result)

    Column1 Column2 Column3                       Column4  probability
0         8       8    Otro     Taxi Affiliation Services     0.001333
1         8       8    Otro                     Flash Cab     0.001661
2         8       8    Otro                      Sun Taxi     0.000853
3         8       8    Otro                         Other     0.001871
4         8       8    Otro          Chicago Independents     0.000463
..      ...     ...     ...                           ...          ...
695      76      76  Mobile                      Sun Taxi     0.000203
696      76      76  Mobile                         Other     0.000446
697      76      76  Mobile          Chicago Independents     0.000110
698      76      76  Mobile  Taxicab Insurance Agency Llc     0.000209
699      76      76  Mobile                  City Service     0.000175

[700 rows x 5 columns]


In [18]:
df_result.to_csv("C:/Users/mario/Maestria/Grandes Cantidades de Datos/VariablesCaracteristicas/generalGroupedIndependent.csv", index=False)