# Accediendo a Cloud Storage con Spark

### Crea un cluster de Dataproc con Jupyter

Este notebook esta diseñado para ser ejecutado en Google Cloud Dataproc.
Siga este tutorial para crear el clúster de Dataproc.

* [Tutorial - Instalar y ejecutar un notebook de jupyter en un cluster de Dataproc](https://cloud.google.com/dataproc/docs/tutorials/jupyter-notebook)

### Python 3 Kernel

Use un kernel de Python 3 (no PySpark) para permitirle configurar SparkSession en el notebook

### Create Spark Session

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
  .appName('Cloud Storage') \
  .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/11 23:52:16 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/11/11 23:52:16 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/11/11 23:52:16 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/11/11 23:52:16 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


### Habilita repl.eagerEval

Esto generará los resultados de DataFrames en cada paso sin la nueva necesidad de mostrar `df.show ()` y también mejora el formato de la salida

In [2]:
if hasattr(__builtins__,'__IPYTHON__'):
    spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

### Lista los archivos en un bucket de GCS

Usando el [sdk de python](https://googleapis.dev/python/storage/latest/client.html) que viene instalado en el cluster de Dataproc. Vamos a usar un dataset público.

In [3]:
from google.cloud import storage

gcs_client = storage.Client()
bucket = gcs_client.bucket('solutions-public-assets')

list(bucket.list_blobs(prefix='time-series-master/'))

[<Blob: solutions-public-assets, time-series-master/GBPUSD_2014_01.csv, 1643821118808732>,
 <Blob: solutions-public-assets, time-series-master/GBPUSD_2014_02.csv, 1643821119500006>,
 <Blob: solutions-public-assets, time-series-master/readme.txt, 1643821238542920>]

### Lee los archivos CSV de GCS hacia un Dataframe de Spark.

In [4]:
df1 = spark \
  .read \
  .option ( "inferSchema" , "true" ) \
  .option ( "header" , "true" ) \
  .csv ( "gs://solutions-public-assets/time-series-master/GBPUSD_*.csv" )

df1.printSchema()



root
 |-- XYZ: string (nullable = true)
 |-- GBP/USD: string (nullable = true)
 |-- 2014-01-01 00:00:00.000000: string (nullable = true)
 |-- 1.4995: double (nullable = true)
 |-- 1.5005: double (nullable = true)



                                                                                

In [5]:
df1

XYZ,GBP/USD,2014-01-01 00:00:00.000000,1.4995,1.5005
XYZ,GBP/USD,2014-01-01 00:00:...,1.4988,1.4998
XYZ,GBP/USD,2014-01-01 00:00:...,1.4979,1.4989
XYZ,GBP/USD,2014-01-01 00:00:...,1.4993,1.5003
XYZ,GBP/USD,2014-01-01 00:00:...,1.4989,1.4999
XYZ,GBP/USD,2014-01-01 00:00:...,1.4998,1.5008
XYZ,GBP/USD,2014-01-01 00:00:...,1.5001,1.5011
XYZ,GBP/USD,2014-01-01 00:00:...,1.4991,1.5001
XYZ,GBP/USD,2014-01-01 00:00:...,1.4978,1.4988
XYZ,GBP/USD,2014-01-01 00:00:...,1.4974,1.4984
XYZ,GBP/USD,2014-01-01 00:00:...,1.4987,1.4997


Si no hay un encabezado con los nombres de las columnas, como podemos ver aquí con el conjunto de datos, o si el esquema no se infiere correctamente, lea los archivos CSV de GCS y defina el esquema.

In [6]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, DateType

schema = StructType([
    StructField("venue", StringType()),
    StructField("currencies", StringType()),
    StructField("time_stamp", TimestampType()),
    StructField("bid", DoubleType()),
    StructField("ask", DoubleType())
])

df2 = spark \
  .read \
  .schema(schema) \
  .csv ( "gs://solutions-public-assets/time-series-master/GBPUSD_*.csv" )

df2.printSchema()

root
 |-- venue: string (nullable = true)
 |-- currencies: string (nullable = true)
 |-- time_stamp: timestamp (nullable = true)
 |-- bid: double (nullable = true)
 |-- ask: double (nullable = true)



View the top 20 rows of the spark dataframe

In [7]:
df2

venue,currencies,time_stamp,bid,ask
XYZ,GBP/USD,2014-01-01 00:00:00,1.4995,1.5005
XYZ,GBP/USD,2014-01-01 00:00:...,1.4988,1.4998
XYZ,GBP/USD,2014-01-01 00:00:...,1.4979,1.4989
XYZ,GBP/USD,2014-01-01 00:00:...,1.4993,1.5003
XYZ,GBP/USD,2014-01-01 00:00:...,1.4989,1.4999
XYZ,GBP/USD,2014-01-01 00:00:...,1.4998,1.5008
XYZ,GBP/USD,2014-01-01 00:00:...,1.5001,1.5011
XYZ,GBP/USD,2014-01-01 00:00:...,1.4991,1.5001
XYZ,GBP/USD,2014-01-01 00:00:...,1.4978,1.4988
XYZ,GBP/USD,2014-01-01 00:00:...,1.4974,1.4984


Imprime las dimensiones del Dataframe. Número de filas y número de columnas

In [8]:
print((df2.count(), len(df2.columns)))



(2436683, 5)


                                                                                

Agregue la columna de hora y filtre los datos para crear un nuevo dataframe con solo 1 día de datos

In [9]:
import pyspark.sql.functions as F

df3 = df2.withColumn("hour", F.hour(F.col("time_stamp"))) \
  .filter(df2['time_stamp'] >= F.lit('2014-01-01 00:00:00')) \
  .filter(df2['time_stamp'] < F.lit('2014-01-02 00:00:10')).cache()

df3

                                                                                

venue,currencies,time_stamp,bid,ask,hour
XYZ,GBP/USD,2014-01-01 00:00:00,1.4995,1.5005,0
XYZ,GBP/USD,2014-01-01 00:00:...,1.4988,1.4998,0
XYZ,GBP/USD,2014-01-01 00:00:...,1.4979,1.4989,0
XYZ,GBP/USD,2014-01-01 00:00:...,1.4993,1.5003,0
XYZ,GBP/USD,2014-01-01 00:00:...,1.4989,1.4999,0
XYZ,GBP/USD,2014-01-01 00:00:...,1.4998,1.5008,0
XYZ,GBP/USD,2014-01-01 00:00:...,1.5001,1.5011,0
XYZ,GBP/USD,2014-01-01 00:00:...,1.4991,1.5001,0
XYZ,GBP/USD,2014-01-01 00:00:...,1.4978,1.4988,0
XYZ,GBP/USD,2014-01-01 00:00:...,1.4974,1.4984,0


In [10]:
print((df3.count(), len(df3.columns)))



(41390, 6)


                                                                                

Agrupalo por "hour" y ordenalo por "total_bids"

In [11]:
import pyspark.sql.functions as F

df4 = df3 \
.groupBy("hour") \
.agg(F.sum('bid').alias('total_bids'))

df4.orderBy('total_bids', ascending=False)

                                                                                

hour,total_bids
12,4888.966399999975
13,4852.239699999989
14,4569.660199999988
15,4518.744800000002
8,2489.1048999999966
10,2431.141400000008
9,2424.1796000000018
18,2368.89479999999
19,2355.5363999999986
11,2347.3602999999907


### Escriba el Dataframe de Spark en Google Cloud Storage en formato CSV

In [14]:
gcs_bucket = gcs_client.project

gcs_filepath = 'gs://{}/currency/hourly_bids.csv'.format(gcs_bucket)

df4.coalesce(1).write \
    .option("header", "true") \
  .mode('overwrite') \
  .csv(gcs_filepath)

                                                                                

Lea el archivo CSV en el nuevo DataFrame para comprobar que se guardó correctamente

In [15]:
df5 = spark.read \
  .option ( "inferSchema" , "true" ) \
  .option ( "header" , "true" ) \
  .csv(f'gs://{gcs_client.project}/currency/*')

df5

hour,total_bids
12,4888.966399999975
22,1512.3183999999997
1,1040.8376999999998
13,4852.239699999989
6,1662.2480999999957
16,2295.8521
3,1032.2795000000003
20,1572.7295999999976
5,1726.0291000000027
19,2355.5363999999986
