In [1]:

!pip install pyarrow
!pip install fastparquet
!pip install s3fs
!pip install config
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Collecting pyarrow
  Downloading pyarrow-15.0.2-cp39-cp39-manylinux_2_28_x86_64.whl (38.3 MB)
     |████████████████████████████████| 38.3 MB 40 kB/s             
[?25hCollecting numpy<2,>=1.16.6
  Downloading numpy-1.26.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.2 MB)
     |████████████████████████████████| 18.2 MB 115.4 MB/s            
[?25hInstalling collected packages: numpy, pyarrow
Successfully installed numpy-1.26.4 pyarrow-15.0.2
Defaulting to user installation because normal site-packages is not writeable
Collecting fastparquet
  Downloading fastparquet-2024.2.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.7 MB)
     |████████████████████████████████| 1.7 MB 6.9 MB/s            
[?25hCollecting pandas>=1.5.0
  Downloading pandas-2.2.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.0 MB)
     |████████████████████████████████| 13.0 MB 100.5 MB/s    

In [2]:
import os
import pyarrow.dataset as ds
import s3fs
import boto3
import config
from io import BytesIO
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import col, regexp_replace, when, isnull, trim, avg, count, sum as pyspark_sum, to_date
from pyspark.sql.types import StringType, IntegerType, DateType
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [3]:
s3 = boto3.resource('s3')
spark = SparkSession.builder\
        .config("spark.jars.packages", "org.apache.spark:spark-hadoop-cloud_2.12:3.3.0")\
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
        .getOrCreate()

In [4]:
filePath = "s3a://taxis-ny-project-group-g/raw/zones/taxi_zones.csv"
df = spark.read.option("delimiter", ",").option("header", True).csv(filePath)

In [5]:
df.head(10)

[Row(OBJECTID='1', Shape_Leng='0.116357453189', the_geom='MULTIPOLYGON (((-74.18445299999996 40.694995999999904, -74.18448899999999 40.69509499999987, -74.18449799999996 40.69518499999987, -74.18438099999997 40.69587799999989, -74.18428199999994 40.6962109999999, -74.18402099999997 40.697074999999884, -74.18391299999996 40.69750699999986, -74.18375099999997 40.69779499999988, -74.18363399999998 40.6983259999999, -74.18356199999994 40.698451999999875, -74.18354399999998 40.69855999999988, -74.18350799999996 40.69870399999992, -74.18327399999998 40.70008999999988, -74.18315699999994 40.701214999999884, -74.18316599999997 40.702384999999886, -74.18313899999998 40.7026279999999, -74.18309399999998 40.7028529999999, -74.18299499999995 40.70315899999985, -74.18284199999994 40.70346499999989, -74.18264399999998 40.70373499999988, -74.18242799999996 40.70395099999992, -74.18220299999996 40.704139999999896, -74.18203199999994 40.70425699999987, -74.18180699999994 40.7043919999999, -74.181572999

In [7]:
columns = [col('OBJECTID').cast("int"),
         col('Shape_Leng').cast("float"),col('the_geom').cast("string"),col('Shape_Area').cast("float"),col('zone').cast("string"),
           col('LocationID').cast("int"),col('borough').cast("string")]

df = df.select(columns)

In [8]:
na_counts = df.select([pyspark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Exibindo o resultado
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
na_counts.show(truncate=False)

+--------+----------+--------+----------+----+----------+-------+
|OBJECTID|Shape_Leng|the_geom|Shape_Area|zone|LocationID|borough|
+--------+----------+--------+----------+----+----------+-------+
|0       |0         |0       |0         |0   |0         |0      |
+--------+----------+--------+----------+----+----------+-------+



In [9]:
df.head(10)

[Row(OBJECTID=1, Shape_Leng=0.11635745316743851, the_geom='MULTIPOLYGON (((-74.18445299999996 40.694995999999904, -74.18448899999999 40.69509499999987, -74.18449799999996 40.69518499999987, -74.18438099999997 40.69587799999989, -74.18428199999994 40.6962109999999, -74.18402099999997 40.697074999999884, -74.18391299999996 40.69750699999986, -74.18375099999997 40.69779499999988, -74.18363399999998 40.6983259999999, -74.18356199999994 40.698451999999875, -74.18354399999998 40.69855999999988, -74.18350799999996 40.69870399999992, -74.18327399999998 40.70008999999988, -74.18315699999994 40.701214999999884, -74.18316599999997 40.702384999999886, -74.18313899999998 40.7026279999999, -74.18309399999998 40.7028529999999, -74.18299499999995 40.70315899999985, -74.18284199999994 40.70346499999989, -74.18264399999998 40.70373499999988, -74.18242799999996 40.70395099999992, -74.18220299999996 40.704139999999896, -74.18203199999994 40.70425699999987, -74.18180699999994 40.7043919999999, -74.18157299

In [12]:
filePath = "s3a://taxis-ny-project-group-g/trusted/zones/taxi_zones"
df.write.mode('overwrite').parquet(filePath)

In [65]:
def TransferData(bucketSourceName, bucketTargetName, folderSourcePrefix, folderSourceName, folderTargetName):
    bucketSource = s3.Bucket(bucketSourceName)
    print(bucketSource)
    for obj in bucketSource.objects.filter(Prefix=folderSourcePrefix):
            source_filename = (obj.key).split('/')[-1]
            target_filename = "s3a://{}/{}/{}".format(bucketSourceName, folderSourceName, source_filename)
            print(target_filename)
            #Leitura de arquivo
            df_data = spark.read.option("header", True).parquet(target_filename)

            columns = [col('VendorID').cast("int"), "tpep_pickup_datetime", "tpep_dropoff_datetime",
             col('passenger_count').cast("int"),col('trip_distance').cast("float"),col('RatecodeID').cast("int"),col('store_and_fwd_flag').cast("string"),
               col('PULocationID').cast("int"),col('DOLocationID').cast("int"),col('payment_type').cast("int"),
              col('fare_amount').cast("float"),col('extra').cast("float"),col('mta_tax').cast("float"),col('tip_amount').cast("float"),col('tolls_amount').cast("float"),
              col('improvement_surcharge').cast("float"),col('total_amount').cast("float"),col('congestion_surcharge').cast("float"),col('Airport_fee').cast("float")]
            
            #Alteração de tipos de colunas
            df_data = df_data.select(columns)
            
            #Remoção de registros nulos
            df_data = df_data.filter(~df_data.RatecodeID.isNull())
            
            #Gravação de arquivo
            filePath = "s3a://{}/{}/{}".format(bucketTargetName, folderTargetName, source_filename.replace('.parquet',''))
            df_data.write.mode('overwrite').parquet(filePath)
            

bucketSourceName = 'taxis-ny-project-group-g'
bucketTargetName = 'taxis-ny-project-group-g'
folderTargetName = 'trusted/trip data'
folderSourceName = 'raw/trip data'

for year in range(2022,2024):
    print(year)
    folderSourcePrefix = "{}/{}{}".format('raw/trip data', 'yellow_tripdata_',year)
    print(folderSourcePrefix)
    TransferData(bucketSourceName,bucketTargetName,folderSourcePrefix,folderSourceName,folderTargetName)

2022
raw/trip data/yellow_tripdata_2022
s3.Bucket(name='taxis-ny-project-group-g')
s3a://taxis-ny-project-group-g/raw/trip data/yellow_tripdata_2022-01.parquet
s3a://taxis-ny-project-group-g/raw/trip data/yellow_tripdata_2022-02.parquet
s3a://taxis-ny-project-group-g/raw/trip data/yellow_tripdata_2022-03.parquet
s3a://taxis-ny-project-group-g/raw/trip data/yellow_tripdata_2022-04.parquet
s3a://taxis-ny-project-group-g/raw/trip data/yellow_tripdata_2022-05.parquet
s3a://taxis-ny-project-group-g/raw/trip data/yellow_tripdata_2022-06.parquet
s3a://taxis-ny-project-group-g/raw/trip data/yellow_tripdata_2022-07.parquet
s3a://taxis-ny-project-group-g/raw/trip data/yellow_tripdata_2022-08.parquet
s3a://taxis-ny-project-group-g/raw/trip data/yellow_tripdata_2022-09.parquet
s3a://taxis-ny-project-group-g/raw/trip data/yellow_tripdata_2022-10.parquet
s3a://taxis-ny-project-group-g/raw/trip data/yellow_tripdata_2022-11.parquet
s3a://taxis-ny-project-group-g/raw/trip data/yellow_tripdata_2022-12.p