## Tubería con Auto Loader

###Acceso a Blob Storage:

In [0]:
import pyspark

In [0]:
storage_account_name = "pfcrudo"
storage_account_key = "ASY9nV2+pddQN2Y8AUZef5m4JtdidWOa5RW+fNIkj2HE60F5s95ueJraFC2/Jv1KulbyL5hwcYk6+AStCn7cjw=="

In [0]:
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)

In [0]:
%fs ls wasbs://yellowtaxis@pfcrudo.blob.core.windows.net/

path,name,size,modificationTime
wasbs://yellowtaxis@pfcrudo.blob.core.windows.net/yellow_2021_12.parquet,yellow_2021_12.parquet,49639052,1681847938000


###Leer archivo y obtener el esquema general

In [0]:
container = "yellowtaxis"
dataframe = spark.read.parquet(f"wasbs://{container}@{storage_account_name}.blob.core.windows.net/yellow_2021_12.parquet")

In [0]:
#Probar si se ve el esquema
dataframe.schema

Out[257]: StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', TimestampType(), True), StructField('tpep_dropoff_datetime', TimestampType(), True), StructField('passenger_count', DoubleType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', DoubleType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True)])

In [0]:
#Probar si se ve el tipo de objeto
type(dataframe.schema)

Out[258]: pyspark.sql.types.StructType

Establecer el esquema deseado (para reducir memoria):

In [0]:
from pyspark.sql.types import StructType, StructField, ByteType, TimestampType, ShortType, StringType, FloatType

Exportar el esquema del archivo a JSON:

In [0]:
#Guardar el schema a un archivo JSON en el almacenamiento de Databricks.
import json
with open("/dbfs/taxischema.json", "w") as f: 
    json.dump(dataframe.schema.jsonValue(), f)

In [0]:
#Probar si se lee el esquema desde el archivo JSON.
with open("/dbfs/taxischema.json") as sch_file:
    dataset_schema = StructType.fromJson(json.load(sch_file))
    #print(dataset_schema.simpleString())
dataset_schema

Out[261]: StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', TimestampType(), True), StructField('tpep_dropoff_datetime', TimestampType(), True), StructField('passenger_count', DoubleType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', DoubleType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True)])

###Lectura

Auto Loader

In [0]:
stream_in = (spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.useNotifications", False)
                .option("cloudFiles.format", "parquet")
                .schema(dataset_schema)
                .load("wasbs://yellowtaxis@pfcrudo.blob.core.windows.net/"))

###Transformación

In [0]:
from pyspark.sql.functions import col, year, mode, abs

In [0]:
#Eliminar columnas indeseadas y rellenar nulos.
df = (
    stream_in
    .drop("VendorID", "RatecodeID", "store_and_fwd_flag")
    .na.fill(value = 0, subset = ["fare_amount", "extra", "mta_tax", "tolls_amount", "improvement_surcharge", "congestion_surcharge"])
)

In [0]:
#Ignorar filas con ID de ubicación desconocida ("PULocationID", "DOLocationID", 264, 265).
df = df.filter(
    (col("PULocationID") != 264) & 
    (col("PULocationID") != 265) & 
    (col("DOLocationID") != 264) & 
    (col("DOLocationID") != 265)
)

In [0]:
#Usar valores absolutos para columnas numéricas.
df = df.withColumn("passenger_count", abs(col("passenger_count")))
df = df.withColumn("trip_distance", abs(col("trip_distance")))
df = df.withColumn("fare_amount", abs(col("fare_amount")))
df = df.withColumn("total_amount", abs(col("total_amount")))
df = df.withColumn("extra", abs(col("extra")))
df = df.withColumn("mta_tax", abs(col("mta_tax")))
df = df.withColumn("tip_amount", abs(col("tip_amount")))
df = df.withColumn("tolls_amount", abs(col("tolls_amount")))
df = df.withColumn("improvement_surcharge", abs(col("improvement_surcharge")))
df = df.withColumn("congestion_surcharge", abs(col("congestion_surcharge")))
df = df.withColumn("airport_fee", abs(col("airport_fee")))

###Carga a SQL Database

Conexión a SQL Database:

In [0]:
def output_sqldb(batch_df, batch_id):
    #Unas últimas transformaciones.
    right_year = batch_df.select(mode(year('tpep_pickup_datetime'))).head()[0]
    batch_df = batch_df.filter(year("tpep_pickup_datetime") == right_year)

    #Propiedades para la conexión.
    jdbcHostname = "pfservidor.database.windows.net"
    jdbcPort = 1433
    jdbcDatabase = "yellowtaxis"
    jdbcUsername = "ivnng"
    jdbcPassword = "henryPF2023"
    jdbcDriver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

    jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};databaseName={jdbcDatabase};user={jdbcUsername};password={jdbcPassword}"

    dbtable = "trips"

    #Convertir de PySpark DataFrame a Azure DW...
    batch_df.write.format("jdbc").mode("append").option("url", jdbcUrl).option("dbtable", dbtable).save()


Auto Loader

In [0]:
stream_out = (df.writeStream
                .trigger(once = True)
                .foreachBatch(output_sqldb)
                .option("checkpointLocation", "wasbs://checkpoint@pfcrudo.blob.core.windows.net/")
                .start())