In [1]:
from BucketS3Config import *
import pandas as pd
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import os, json
import glob
from pyspark.sql import SQLContext
import pandas as pd
from pyspark.sql.functions import upper, col
import pyspark.sql.functions as F

In [2]:
# Criação da sessão Spark
spark = SparkSession \
        .builder \
        .appName('Processo Seletivo - DataSprints') \
        .getOrCreate()

In [3]:
# Criação do contexto Spark SQL   
sqlContext = SQLContext(spark)

In [4]:
# Gerando a lista de arquivos JSON a serem lidos para juntar os dados das viagens
def createFileList(path):
    json_pattern = os.path.join(path,'*.json')
    file_list = glob.glob(json_pattern)
    return file_list

In [5]:
# Calcular diferença das datas em segundos
def dateTimeDiff(dropoff_time,pickup_time):
    timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
    timeDiff = (F.unix_timestamp(dropoff_time, format=timeFmt) - F.unix_timestamp(pickup_time, format=timeFmt))
    return timeDiff

In [6]:
file_list = createFileList('spark_warehouse/datasets/trips/')

In [7]:
fields = [
    StructField("vendor_id", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", FloatType(), True),
    StructField("pickup_longitude", FloatType(), True),
    StructField("pickup_latitude", FloatType(), True),
    StructField("rate_code", IntegerType(), True),
    StructField("store_and_fwd_flag", FloatType(), True),
    StructField("dropoff_longitude", FloatType(), True),
    StructField("dropoff_latitude", FloatType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", FloatType(), True),
    StructField("surcharge", FloatType(), True),
    StructField("tip_amount", IntegerType(), True),
    StructField("tolls_amount", FloatType(), True),
    StructField("total_amount", FloatType(), True)
]

schema = StructType(fields)

dataset = spark.read.json(file_list, schema=schema)

In [8]:
dataset.show()

+---------+--------------------+--------------------+---------------+-------------+----------------+---------------+---------+------------------+-----------------+----------------+------------+-----------+---------+----------+------------+------------+
|vendor_id|     pickup_datetime|    dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|rate_code|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|tip_amount|tolls_amount|total_amount|
+---------+--------------------+--------------------+---------------+-------------+----------------+---------------+---------+------------------+-----------------+----------------+------------+-----------+---------+----------+------------+------------+
|      CMT|2009-04-21 19:03:...|2009-04-21 19:04:...|              2|          0.8|       -74.00411|       40.74295|     null|              null|        -73.99471|        40.74795|        Cash|        5.4|      0.0|         0|         0.0|  

In [9]:
# Todas as formas de pagamento em minúsculo
dataset = dataset.withColumn('payment_type', upper(col('payment_type')))

In [10]:
# Duração da viagem em segundos
dataset = dataset.withColumn("duration", dateTimeDiff(col('dropoff_datetime'),col('pickup_datetime')) )

In [11]:
# Mês da viagem
dataset = dataset.withColumn("month_travel", F.month(col('pickup_datetime')) )

In [12]:
# Ano da viagem
dataset = dataset.withColumn("year_travel", F.year(col('pickup_datetime')) )

In [13]:
# Dia da semana da viagem
dataset = dataset.withColumn("weekday", F.date_format('pickup_datetime', 'E'))

In [14]:
# Criando a tabela trips 
dataset.createOrReplaceTempView("trips");

In [15]:
filename = 'datasets/trips_dataset'

In [16]:
#dataset.select('*').write.format('com.databricks.spark.csv').options(codec="org.apache.hadoop.io.compress.GzipCodec").save(filename)
#dataset.select('*').write.mode('overwrite').format('com.databricks.spark.csv').save(filename)

# dataset.select("*") \
# .write.format('com.databricks.spark.csv') \
# .mode('overwrite') \
# .save(filename)

In [None]:
dataset.repartition(1).write.mode('overwrite').csv(filename, sep=';',header = 'true')

In [17]:
saveDirectoryToS3(filename)

Arquivo adicionado ao bucket s3://projeto-datasprints/datasets/trips_dataset/part-00000-d1c52d9f-50de-49c7-b58c-67469e3ca6ac-c000.csv
Arquivo adicionado ao bucket s3://projeto-datasprints/datasets/trips_dataset/part-00013-d1c52d9f-50de-49c7-b58c-67469e3ca6ac-c000.csv
Arquivo adicionado ao bucket s3://projeto-datasprints/datasets/trips_dataset/part-00012-d1c52d9f-50de-49c7-b58c-67469e3ca6ac-c000.csv
Arquivo adicionado ao bucket s3://projeto-datasprints/datasets/trips_dataset/part-00003-d1c52d9f-50de-49c7-b58c-67469e3ca6ac-c000.csv
Arquivo adicionado ao bucket s3://projeto-datasprints/datasets/trips_dataset/part-00011-d1c52d9f-50de-49c7-b58c-67469e3ca6ac-c000.csv
Arquivo adicionado ao bucket s3://projeto-datasprints/datasets/trips_dataset/part-00002-d1c52d9f-50de-49c7-b58c-67469e3ca6ac-c000.csv
Arquivo adicionado ao bucket s3://projeto-datasprints/datasets/trips_dataset/part-00004-d1c52d9f-50de-49c7-b58c-67469e3ca6ac-c000.csv
Arquivo adicionado ao bucket s3://projeto-datasprints/datasets