In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

## Setamos las credenciales necesarias y la ruta del archivo .jar. 
El mismo lo creamos en una carpeta __lib__ dentro de esta carpeta __notebooks__

In [6]:
credentials_location = '/home/nlealiapp/.gc/projectonleali-649724cf41f9.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('Mipruebajar') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [7]:
sc = SparkContext.getOrCreate(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [8]:
spark = SparkSession.builder\
    .config(conf=sc.getConf()) \
        .getOrCreate()

## Leemos el archivo de la nube

In [9]:
df_google = spark.read.parquet("gs://projectonleali-mibucketdataproc/data/green/2020/01/green_2020_01.parquet")

                                                                                

In [10]:
df_google.show(10)

[Stage 1:>                                                          (0 + 1) / 1]

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2019-12-18 15:52:30|  2019-12-18 15:54:39|                 N|       1.0|         264|         264|            5.0|          0.0|        3.5|  0.5|    0.

                                                                                

## Hacemos una operación simple para guardar l resultado en el storage

In [13]:
df_google.createOrReplaceTempView('mi_vw')

df_totalizado = spark.sql("""
                          select PULocationID as ZoneID,
                          date_trunc('day', lpep_pickup_datetime) as date,
                          sum(total_amount) as totalAmount
                          from mi_vw
                          group by ZoneID, date
                          """)

In [14]:
df_totalizado.show()



+------+-------------------+------------------+
|ZoneID|               date|       totalAmount|
+------+-------------------+------------------+
|   174|2020-01-01 00:00:00| 453.0300000000001|
|   240|2020-01-01 00:00:00|             64.92|
|   220|2020-01-02 00:00:00|1357.4500000000003|
|   247|2020-01-06 00:00:00|           1525.06|
|    65|2020-01-06 00:00:00| 5526.510000000004|
|    29|2020-01-06 00:00:00|           1093.52|
|   228|2020-01-07 00:00:00|1204.3300000000002|
|   258|2020-01-07 00:00:00| 580.8900000000001|
|   196|2020-01-08 00:00:00|2726.0399999999977|
|    39|2020-01-09 00:00:00| 3862.879999999999|
|   228|2020-01-12 00:00:00|            761.86|
|    69|2020-01-12 00:00:00|1127.7299999999998|
|   120|2020-01-13 00:00:00|             21.36|
|   122|2020-01-14 00:00:00|            660.86|
|    63|2020-01-15 00:00:00|            319.04|
|   209|2020-01-15 00:00:00|            283.89|
|   255|2020-01-16 00:00:00|1993.8199999999986|
|   196|2020-01-16 00:00:00|2180.2699999

                                                                                

## Guardamos en GCLOUD

In [15]:
df_totalizado.coalesce(1).write.parquet("gs://projectonleali-mibucket/reportes/total_grenn.parquet")

                                                                                