<a href="https://colab.research.google.com/github/gabrielfernandorey/EDVAI/blob/main/PySpark/PySpark_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark

### Instalación y carga de Pyspark

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=431c8e84b174529f4deca834bee4bb25a504c3b7490b5408df575fc89d4f95ae
  Stored in directory: /root/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('test_pyspark').getOrCreate()

### Librerías necesarias

In [4]:
from pyspark.sql.types import StringType, BooleanType, FloatType, IntegerType, DoubleType, DateType
import pyspark.sql.functions as F
from pyspark.sql.functions import sum, col, desc, asc, count, countDistinct, round, max, min, avg
from pyspark.sql.functions import to_timestamp,date_format
from pyspark.sql.window import Window

from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasInputCols, HasOutputCols, Param, Params, TypeConverters
from pyspark import keyword_only
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml import Model
from pyspark.ml import Estimator

from datetime import datetime
import numpy as np

### Importamos datos

In [6]:
!wget https://data-engineer-edvai.s3.amazonaws.com/yellow_tripdata_2021-01.csv

--2023-04-14 21:31:57--  https://data-engineer-edvai.s3.amazonaws.com/yellow_tripdata_2021-01.csv
Resolving data-engineer-edvai.s3.amazonaws.com (data-engineer-edvai.s3.amazonaws.com)... 52.217.18.84, 54.231.229.137, 52.216.136.3, ...
Connecting to data-engineer-edvai.s3.amazonaws.com (data-engineer-edvai.s3.amazonaws.com)|52.217.18.84|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 125981363 (120M) [text/csv]
Saving to: ‘yellow_tripdata_2021-01.csv’


2023-04-14 21:31:59 (65.5 MB/s) - ‘yellow_tripdata_2021-01.csv’ saved [125981363/125981363]



In [14]:
df = spark.read.option("header","true").csv("*.csv")

In [33]:
df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)



In [15]:
df.show(10)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1| 2021-01-01 00:30:10|  2021-01-01 00:36:12|              1|         2.10|         1|                 N|         142|          43|           2|          8|    3|    0.5|         0|           0|                  0.3

### Insertar en la tabla payments (VendorID, tpep_pickup_datetetime, payment_type, total_amount) Solamente los pagos con tarjeta de crédito

In [16]:
# Creamos vista
df.createOrReplaceTempView("yellow_tripdata")

In [17]:
df_5 = spark.sql("select VendorId, cast(tpep_pickup_datetime as date), cast(payment_type as int), total_amount from yellow_tripdata where payment_type = 1")

In [18]:
df_5.show(10)

+--------+--------------------+------------+------------+
|VendorId|tpep_pickup_datetime|payment_type|total_amount|
+--------+--------------------+------------+------------+
|       1|          2021-01-01|           1|       51.95|
|       1|          2021-01-01|           1|       36.35|
|       2|          2021-01-01|           1|       24.36|
|       1|          2021-01-01|           1|       14.15|
|       1|          2021-01-01|           1|       18.95|
|       2|          2021-01-01|           1|        24.3|
|       2|          2021-01-01|           1|       10.79|
|       2|          2021-01-01|           1|       14.16|
|       2|          2021-01-01|           1|        10.3|
|       2|          2021-01-01|           1|       12.09|
+--------+--------------------+------------+------------+
only showing top 10 rows



### Insertar en la tabla passengers (tpep_pickup_datetetime, passenger_count, total_amount) los registros cuya cantidad de pasajeros sea mayor a 2 y el total del viaje cueste más de 8 dólares.

In [20]:
df_6 = spark.sql("select cast(tpep_pickup_datetime as date) as tpep_pickup_date , passenger_count, total_amount from yellow_tripdata where passenger_count >2 and total_amount > 8")

In [21]:
df_6.show(10)

+----------------+---------------+------------+
|tpep_pickup_date|passenger_count|total_amount|
+----------------+---------------+------------+
|      2021-01-01|              3|        24.3|
|      2021-01-01|              5|       14.16|
|      2021-01-01|              3|         9.3|
|      2021-01-01|              4|        18.3|
|      2021-01-01|              4|        13.3|
|      2021-01-01|              3|        40.3|
|      2021-01-01|              5|        14.8|
|      2021-01-01|              3|       18.59|
|      2021-01-01|              3|       13.56|
|      2021-01-01|              3|        9.96|
+----------------+---------------+------------+
only showing top 10 rows



Los valores son aproximados pero no exactos al resultado real

### Insertar en la tabla tolls (tpep_pickup_datetetime, passenger_count, tolls_amount, total_amount) los registros que tengan pago de peajes mayores a 0.1 y cantidad de pasajeros mayores a 1.

In [25]:
df_7 = spark.sql("select cast(tpep_pickup_datetime as date) as tpep_pickup_date, passenger_count, tolls_amount, total_amount from yellow_tripdata where tolls_amount > 0.1 and passenger_count > 1 ")

In [26]:
df_7.show(10)

+----------------+---------------+------------+------------+
|tpep_pickup_date|passenger_count|tolls_amount|total_amount|
+----------------+---------------+------------+------------+
|      2021-01-01|              2|        6.12|       33.92|
|      2021-01-01|              2|        6.12|       59.42|
|      2021-01-01|              2|        6.12|       35.92|
|      2021-01-01|              6|        6.12|        40.1|
|      2021-01-01|              3|        6.12|          54|
|      2021-01-01|              2|         2.8|        34.1|
|      2021-01-01|              4|        6.12|       61.42|
|      2021-01-01|              4|        6.12|       51.42|
|      2021-01-01|              2|       11.75|       12.05|
|      2021-01-01|              6|        6.12|       71.42|
+----------------+---------------+------------+------------+
only showing top 10 rows



### Insertar en la tabla congestion (tpep_pickup_datetetime, passenger_count, congestion_surcharge, total_amount) los registros que hayan tenido congestión en los viajes en la fecha 2021-01-18

In [44]:
df_8 = spark.sql("select cast(tpep_pickup_datetime as date) as tpep_pickup_date, passenger_count, congestion_surchage, total_amount from yellow_tripdata where cast(tpep_pickup_datetime as date) as tpep_pickup_date = '2021-01-18' ")

ParseException: ignored

In [None]:
df_8.show()

+-------------+----------------+
|trip_distance|tpep_pickup_date|
+-------------+----------------+
|          2.7|      2021-01-01|
|         6.11|      2021-01-01|
|         1.21|      2021-01-01|
|          1.7|      2021-01-01|
|         1.16|      2021-01-01|
|         3.15|      2021-01-01|
|         0.64|      2021-01-01|
|        10.74|      2021-01-01|
|         2.01|      2021-01-01|
|         3.45|      2021-01-01|
|         2.85|      2021-01-01|
|         1.68|      2021-01-01|
|         0.77|      2021-01-01|
|         0.52|      2021-01-01|
|          0.4|      2021-01-01|
|         1.05|      2021-01-01|
|         5.85|      2021-01-01|
|          3.7|      2021-01-01|
|        16.54|      2021-01-01|
|          4.0|      2021-01-01|
+-------------+----------------+
only showing top 20 rows

