In [7]:
# Ruta del archivo en tu Azure Storage
file_path = "abfss://users@storejaut.dfs.core.windows.net/synapse/workspaces/synapse-ejemplo-jaut/taxi_zone_lookup.csv"

# Leer el archivo CSV en un DataFrame de Spark
lookupexample = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_path)

# Mostrar el esquema del DataFrame
lookupexample.printSchema()

# Mostrar las primeras filas
lookupexample.show()




StatementMeta(clusterprueba, 0, 7, Finished, Available, Finished)

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bat

In [9]:
# Azure storage access info
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"
blob_sas_token = r""

# Allow SPARK to read from Blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set(
  'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
  blob_sas_token)
print('Remote blob path: ' + wasbs_path)

# SPARK read parquet, note that it won't load any data yet by now
yellowtaxis = spark.read.parquet(wasbs_path)
print('Register the DataFrame as a SQL temporary view: source')
yellowtaxis.createOrReplaceTempView('source')

yellowtaxis.show()

StatementMeta(clusterprueba, 0, 9, Finished, Available, Finished)

Remote blob path: wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow
Register the DataFrame as a SQL temporary view: source
+--------+-------------------+-------------------+--------------+------------+------------+------------+----------+---------+----------+---------+----------+---------------+-----------+----------+-----+------+--------------------+---------+-----------+-----------+------+-------+
|vendorID| tpepPickupDateTime|tpepDropoffDateTime|passengerCount|tripDistance|puLocationId|doLocationId|  startLon| startLat|    endLon|   endLat|rateCodeId|storeAndFwdFlag|paymentType|fareAmount|extra|mtaTax|improvementSurcharge|tipAmount|tollsAmount|totalAmount|puYear|puMonth|
+--------+-------------------+-------------------+--------------+------------+------------+------------+----------+---------+----------+---------+----------+---------------+-----------+----------+-----+------+--------------------+---------+-----------+-----------+------+-------+
|     CMT|2012-02-29 2

In [10]:
# Obtener el número de filas en el DataFrame
num_filas = yellowtaxis.count()

# Mostrar el número de filas
print(f"El DataFrame tiene {num_filas} filas.")

StatementMeta(clusterprueba, 0, 10, Finished, Available, Finished)

El DataFrame tiene 1571671152 filas.


In [11]:
#### DO NOT CHANGE ANYTHING IN THIS CELL ####

from pyspark.sql.functions import col

def load_data():
    trips = yellowtaxis
    
    print("Trip Count: ", trips.count())
    lookup = lookupexample
    return trips, lookup

def main():
    # Runs your functions implemented above.
    
    print(user())
    trips, lookup = load_data()
    trips = long_trips(trips)
    mtrips = manhattan_trips(trips, lookup)
    wp = weighted_profit(trips, mtrips)
    final = final_output(wp,lookup)
    
    # Outputs the results for you to visually see
    final.show()
    
    # Writes out as a CSV to your bucket.
    #final.write.csv(bucket)

StatementMeta(clusterprueba, 0, 11, Finished, Available, Finished)

In [12]:
def user():
    # Returns a string consisting of your username.
    return 'Jorge Arturo Uc Torres'

StatementMeta(clusterprueba, 0, 12, Finished, Available, Finished)

In [13]:
def long_trips(trips):
    # Returns a Dataframe with Schema the same as :trips:
    filtered_trips = trips.filter(col("tripDistance") >= 2)
    return filtered_trips

StatementMeta(clusterprueba, 0, 13, Finished, Available, Finished)

In [14]:
from pyspark.sql import functions as F
def manhattan_trips(trips, lookup):
    # Returns a Dataframe with Schema: DOLocationID, pcount
    manhattan_locations = lookup.filter(F.col("Borough") == "Manhattan").select("LocationID")
    manhattan_trips = trips.join(manhattan_locations, trips["doLocationId"] == manhattan_locations["LocationID"], how="inner")
    result = manhattan_trips.groupBy("doLocationId").agg(F.count("*").alias("pcount"))
    result = result.orderBy("pcount", ascending=False).limit(20).cache()
    return result

StatementMeta(clusterprueba, 0, 14, Finished, Available, Finished)

In [15]:
from pyspark.sql import functions as F
def weighted_profit(trips, mtrips): 
    avg_total_amount = trips.groupBy("puLocationId").agg(F.avg("totalAmount").alias("avg_total_amount"))
    total_trip_count = trips.groupBy("puLocationId").agg(F.count("*").alias("total_trip_count"))
    trips_with_popular_dropoffs = trips.join(mtrips, trips["doLocationId"] == mtrips["doLocationId"], "inner")
    count_popular_dropoffs = trips_with_popular_dropoffs.groupBy("puLocationId").agg(F.count("*").alias("count_popular_dropoffs"))
    weighted_profit_df = avg_total_amount \
        .join(total_trip_count, "puLocationId", "inner") \
        .join(count_popular_dropoffs, "puLocationId", "left")
    
    weighted_profit_df = weighted_profit_df.withColumn("proportion", 
                                                      F.col("count_popular_dropoffs") / F.col("total_trip_count"))
    
    weighted_profit_df = weighted_profit_df.withColumn("weighted_profit", 
                                                      F.col("avg_total_amount") * F.col("proportion"))
    
    weighted_profit_df = weighted_profit_df.select("puLocationId", "weighted_profit")
    
    return weighted_profit_df

StatementMeta(clusterprueba, 0, 15, Finished, Available, Finished)

In [16]:
def final_output(wp, lookup): 
    # Returns a Dataframe with Schema: Zone, Borough, weighted_profit
    joined_df = wp.join(lookup, wp["puLocationId"] == lookup["LocationID"], how="inner")
    result = joined_df.groupBy("Zone", "Borough").agg(
        F.sum("weighted_profit").alias("weighted_income")
    )
    top_20 = result.orderBy(F.desc("weighted_income")).limit(20)
    
    return top_20

StatementMeta(clusterprueba, 0, 16, Finished, Available, Finished)

In [17]:
main() 

StatementMeta(clusterprueba, 0, 17, Finished, Available, Finished)

Jorge Arturo Uc Torres
Trip Count:  1571671152
+--------------------+---------+------------------+
|                Zone|  Borough|   weighted_income|
+--------------------+---------+------------------+
|        Baisley Park|   Queens|31.205081273352793|
|       South Jamaica|   Queens|29.712567447367064|
|Flushing Meadows-...|   Queens|23.662750316881343|
|     Randalls Island|Manhattan|22.793999713582163|
|             Jamaica|   Queens| 21.42248840989114|
|Springfield Garde...|   Queens| 21.36362250031268|
|Briarwood/Jamaica...|   Queens| 19.01179378895169|
|   LaGuardia Airport|   Queens|18.580626766026285|
|              Corona|   Queens|18.363637834566422|
|         JFK Airport|   Queens|18.046507173731673|
|        Astoria Park|   Queens|17.088652456501663|
|         Jamaica Bay|   Queens|15.050362596914402|
|             Maspeth|   Queens| 13.04977086839208|
| Morningside Heights|Manhattan|12.064342534405057|
|   Battery Park City|Manhattan|11.922662352821469|
|        Battery 