###Instalación Spark, creación de sesión y unidad personal de Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!ls

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget https://downloads.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz

In [None]:
!wget -q http://apache.mirrors.pair.com/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz

In [None]:
!ls

In [None]:
!tar xf spark-3.5.3-bin-hadoop3.tgz

In [None]:
!pip install -q findspark
!pip install matplotlib

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"
import findspark
findspark.init()
import numpy as np
import matplotlib.pyplot as plt

In [None]:
findspark.find()

In [None]:
from scipy import stats
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import when
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer

spark = SparkSession.builder\
        .master("local")\
        .appName("Pyspark_SQL")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
spark

DESARROLLO, ejercicio 4:

Cargar el conjunto de datos en un dataframe

In [None]:
data_path = '/content/drive/MyDrive/TokioSchool/data_curso/'

df = spark.read.options(header=True, inferSchema=True).csv(data_path + 'Air_Traffic_Passenger_Statistics.csv')

ANÁLISIS DESCRIPTIVO

- MEDIA DE CADA ELEMENTO

Media del número de pasajeros según la región, categoria de precio y el mes

In [None]:
df_region = df.groupBy('GEO Region').agg (F.mean('Passenger Count').alias('mean_passenger_count')).show()

df_price = df.groupBy('Price Category Code').agg (F.mean('Passenger Count').alias('mean_passenger_count')).show()

df_month = df.groupBy('Month').agg (F.mean('Passenger Count').alias('mean_passenger_count')).show()

- DESVIACIÓN TÍPICA DE CADA ELEMENTO

Desviación estandar del número de pasajeros según la región, categoria de precio y el mes

In [None]:
df_region = df.groupBy('GEO Region').agg (F.std('Passenger Count').alias('std_passenger_count')).show()

df_price = df.groupBy('Price Category Code').agg (F.std('Passenger Count').alias('std_passenger_count')).show()

df_month = df.groupBy('Month').agg (F.std('Passenger Count').alias('std_passenger_count')).show()

- Análisis de correlación
- Matriz de correlación en el resultado

In [None]:
# Transformo todas las variables para poder calcular la correlacion. Crear nuevas columnas
operating_airline_indexer = StringIndexer(inputCol="Operating Airline", outputCol="Operating Airline Index")
df = operating_airline_indexer.fit(df).transform(df)

geo_summary_indexer = StringIndexer(inputCol="GEO Summary", outputCol="GEO Summary Index")
df = geo_summary_indexer.fit(df).transform(df)

geo_region_indexer = StringIndexer(inputCol="GEO Region", outputCol="GEO Region Index")
df = geo_region_indexer.fit(df).transform(df)

activity_type_code_indexer = StringIndexer(inputCol="Activity Type Code", outputCol="Activity Type Code Index")
df = activity_type_code_indexer.fit(df).transform(df)

price_category_code_indexer = StringIndexer(inputCol="Price Category Code", outputCol="Price Category Code Index")
df = price_category_code_indexer.fit(df).transform(df)

terminal_indexer = StringIndexer(inputCol="Terminal", outputCol="Terminal Index")
df = terminal_indexer.fit(df).transform(df)

boarding_area_indexer = StringIndexer(inputCol="Boarding Area", outputCol="Boarding Area Index")
df = boarding_area_indexer.fit(df).transform(df)

In [None]:
# Mapeo de meses a números
df = df.withColumn ('month_numero',
    when (df['Month'] == 'January', 1)
    .when (df['Month'] == 'February', 2)
    .when (df['Month'] == 'March', 3)
    .when (df['Month'] == 'April', 4)
    .when (df['Month'] == 'May', 5)
    .when (df['Month'] == 'June', 6)
    .when (df['Month'] == 'July', 7)
    .when (df['Month'] == 'August', 8)
    .when (df['Month'] == 'September', 9)
    .when (df['Month'] == 'October', 10)
    .when (df['Month'] == 'November', 11)
    .when (df['Month'] == 'December', 12)
)

In [None]:
import pandas as pd
import seaborn as sns

# Lista de columnas numéricas
numeric_cols = ['Operating Airline Index', 'GEO Summary Index', 'GEO Region Index', 'Activity Type Code Index', 'Price Category Code Index',
                'Terminal Index', 'Boarding Area Index', 'Passenger Count', 'Year', 'month_numero']

# Crear una lista para almacenar los resultados
correlation_data = []

# Calcular la correlación entre cada par de columnas
for col1 in numeric_cols:
    for col2 in numeric_cols:
        corr_value = df.stat.corr(col1, col2)
        correlation_data.append([col1, col2, corr_value])  # Añadir los resultados a la lista
        print(f"Correlación entre {col1} y {col2}: {corr_value}")

-  Algoritmo: Árbol de decisión

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor

# Crear una sesión de Spark
spark = SparkSession.builder.appName("Air Traffic Passenger Prediction").getOrCreate()

# Cargar los datos
df = spark.read.option("header", True).csv("/content/drive/MyDrive/TokioSchool/data_curso/Air_Traffic_Passenger_Statistics.csv")

# Transformar columnas a indice
operating_airline_indexer = StringIndexer(inputCol="Operating Airline", outputCol="Operating Airline Index")
df = operating_airline_indexer.fit(df).transform(df)

published_airline_indexer = StringIndexer(inputCol="Published Airline", outputCol="Published Airline Index")
df = published_airline_indexer.fit(df).transform(df)

GEO_summary_indexer = StringIndexer(inputCol="GEO Summary", outputCol="GEO Summary Index")
df = GEO_summary_indexer.fit(df).transform(df)

activity_type_code_indexer = StringIndexer(inputCol="Activity Type Code", outputCol="Activity Type Code Index")
df = activity_type_code_indexer.fit(df).transform(df)

price_category_code_indexer = StringIndexer(inputCol="Price Category Code", outputCol="Price Category Code Index")
df = price_category_code_indexer.fit(df).transform(df)

terminal_indexer = StringIndexer(inputCol="Terminal", outputCol="Terminal Index")
df = terminal_indexer.fit(df).transform(df)

boarding_area_indexer = StringIndexer(inputCol="Boarding Area", outputCol="Boarding Area Index")
df = boarding_area_indexer.fit(df).transform(df)

adjusted_activity_type_code_indexer = StringIndexer(inputCol="Adjusted Activity Type Code", outputCol="Adjusted Activity Type Code Index")
df = adjusted_activity_type_code_indexer.fit(df).transform(df)

month_indexer = StringIndexer(inputCol="Month", outputCol="Month Index")
df = month_indexer.fit(df).transform(df)

# Modificar tipos de datos
df = df.withColumn('Activity Period', df['Activity Period'].cast('float'))
df = df.withColumn('Operating Airline Index', df['Operating Airline Index'].cast('float'))
df = df.withColumn('Published Airline Index', df['Published Airline Index'].cast('float'))
df = df.withColumn('GEO Summary Index', df['GEO Summary Index'].cast('float'))
df = df.withColumn('Activity Type Code Index', df['Activity Type Code Index'].cast('float'))
df = df.withColumn('Price Category Code Index', df['Price Category Code Index'].cast('float'))
df = df.withColumn('Terminal Index', df['Terminal Index'].cast('float'))
df = df.withColumn('Boarding Area Index', df['Boarding Area Index'].cast('float'))
df = df.withColumn('Passenger Count', df['Passenger Count'].cast('float'))
df = df.withColumn('Adjusted Activity Type Code Index', df['Adjusted Activity Type Code Index'].cast('float'))
df = df.withColumn('Year', df['Year'].cast('float'))
df = df.withColumn('Month Index', df['Month Index'].cast('float'))

# Crear la columna 'features' utilizando VectorAssembler
va = VectorAssembler(inputCols=['Activity Period','Operating Airline Index', 'Published Airline Index', 'GEO Summary Index','Activity Type Code Index',
                                'Price Category Code Index','Terminal Index','Boarding Area Index','Year', 'Adjusted Activity Type Code Index','Month Index'], outputCol='features')

df = va.transform(df)

# Dividir los datos en entrenamiento y prueba
train, test = df.randomSplit([0.8, 0.2])

# Definir el modelo de regresión basado en árboles de decisión
rf = GBTRegressor(featuresCol='features', labelCol='Passenger Count', maxIter=100)

# Entrenar el modelo
rf_model = rf.fit(train)

# Hacer predicciones
predictions = rf_model.transform(test)

# Evaluar el modelo
evaluator = RegressionEvaluator(labelCol="Passenger Count", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

print(f"R2: {r2}")

from pyspark.sql.types import StringType

# Función para convertir el vector 'features' en una cadena de texto
def vector_to_string(v):
    return str(v)

# Crear un UDF (User Defined Function) para aplicar la conversión
vector_to_string_udf = udf(vector_to_string, StringType())

# Aplicar el UDF para convertir la columna 'features' a texto
df_with_string_features = predictions.withColumn('features_str', vector_to_string_udf(predictions['features']))

# Seleccionar solo las columnas necesarias (incluyendo la nueva columna de características como cadena de texto)
df_export = df_with_string_features.select('features_str', 'Passenger Count', 'prediction')

# Exportar a CSV
df_export.coalesce(1).write \
  .mode("overwrite") \
  .format("csv") \
  .option("header", "true") \
  .save("/content/drive/MyDrive/TokioSchool/data_algoritmo/")