## 1. Data Engineering - Process CSV files into BQ Tables via Spark
### This notebook converts a CSV and stores it in BigQuery 

### Create Spark session with BQ connector

Create a Spark session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, IntegerType, StructField, StructType
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('Spark - Data Eng Demo') \
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar') \
.getOrCreate()

**TODO**
* Provide your bucket path below


In [2]:
BUCKET_PATH= "gs://thatistoomuchdata-data"
PROJECT_ID= "thatistoomuchdata"

In [3]:
!gsutil ls $BUCKET_PATH

gs://thatistoomuchdata-data/cluster-config.yaml
gs://thatistoomuchdata-data/transaction_data_join.csv
gs://thatistoomuchdata-data/transaction_data_test.csv
gs://thatistoomuchdata-data/transaction_data_train.csv
gs://thatistoomuchdata-data/google-cloud-dataproc-metainfo/
gs://thatistoomuchdata-data/model/
gs://thatistoomuchdata-data/workflows/


Check the first 1000 bytes of a file on GCS

In [4]:
!gsutil cat -h -r 0-1000 $BUCKET_PATH/transaction_data_train.csv

==> gs://thatistoomuchdata-data/transaction_data_train.csv <==
step,type,amount,oldbalanceOrg,newbalanceOrig,oldbalanceDest,newbalanceDest,isFraud,transactionID
192,DEBIT,2129.14,254447,252317.86,441844.59,443973.73,0,97b164f8-7ff8-4a77-a4e7-59bc2c42b74d
395,DEBIT,9910.44,24040,14129.56,2368599.34,2378509.78,0,80828313-2a6c-4ea1-8250-ed8e8ec5b390
589,DEBIT,2984.18,12732,9747.82,906466.61,909450.79,0,831843e6-7611-4a4e-a62d-d6de2dbe411c
266,DEBIT,1747.32,113637,111889.68,71894.4,73641.72,0,84449213-8dbb-4b16-a721-c31111d7f58c
241,DEBIT,64.6,1458,1393.4,3810660.18,3810724.78,0,14162c62-0eaf-40c2-8da9-12b76ce7c6f2
302,DEBIT,2000.33,359,0,121622.24,123622.57,0,3a3843df-bcb3-448c-8cc5-948de39d0ab7
191,DEBIT,2857.43,75295,72437.57,10683664.97,10686522.4,0,7fa9b44b-7cca-4eff-a638-922f562aeef9
594,DEBIT,1629.17,63823,62193.83,191210.18,192839.35,0,43ea6b96-c0d5-46d3-84aa-926ece6c535f
22,DEBIT,1405.44,738,0,5296399.49,5449726.04,0,f5a06ff1-201a-43c8-beaa-69f3ecdf457b
324,DEBIT,8907.19,2422,0,45

In [5]:
path_to_train_csv = BUCKET_PATH+ "/transaction_data_train.csv"
print(path_to_train_csv)

gs://thatistoomuchdata-data/transaction_data_train.csv


### Get Spark application ID 

This is useful to easily fine application in the Spark History UI

In [6]:
spark.conf.get("spark.app.id")

'application_1621409951475_0001'

Load the CSV file into a Spark Dataframe

In [7]:
df_transaction_data_from_csv = spark \
.read \
.option("inferSchema" , "true") \
.option("header" , "true") \
.csv(path_to_train_csv)

In [8]:
df_transaction_data_from_csv.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- transactionID: string (nullable = true)



### Create the BQ dataset & table to persist data

**TODO** (Challenge 1)
* Create the BQ schema from the spark dataframe 
    * Reference for converting data types: https://github.com/GoogleCloudDataproc/spark-bigquery-connector#data-types

schema_inline='step:int64,type:string,amount:float64,oldbalanceOrg:float64,newbalanceOrig:float64,
oldbalanceDest:float64,newbalanceDest:float64,isFraud:int64,transactionID:string'

In [9]:
df_transaction_data_from_csv.schema

StructType(List(StructField(step,IntegerType,true),StructField(type,StringType,true),StructField(amount,DoubleType,true),StructField(oldbalanceOrg,DoubleType,true),StructField(newbalanceOrig,DoubleType,true),StructField(oldbalanceDest,DoubleType,true),StructField(newbalanceDest,DoubleType,true),StructField(isFraud,IntegerType,true),StructField(transactionID,StringType,true)))

In [10]:
help (df_transaction_data_from_csv.schema)

Help on StructType in module pyspark.sql.types object:

class StructType(DataType)
 |  StructType(fields=None)
 |  
 |  Struct type, consisting of a list of :class:`StructField`.
 |  
 |  This is the data type representing a :class:`Row`.
 |  
 |  Iterating a :class:`StructType` will iterate its :class:`StructField`\s.
 |  A contained :class:`StructField` can be accessed by name or position.
 |  
 |  >>> struct1 = StructType([StructField("f1", StringType(), True)])
 |  >>> struct1["f1"]
 |  StructField(f1,StringType,true)
 |  >>> struct1[0]
 |  StructField(f1,StringType,true)
 |  
 |  Method resolution order:
 |      StructType
 |      DataType
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  __getitem__(self, key)
 |      Access fields by name or slice.
 |  
 |  __init__(self, fields=None)
 |      >>> struct1 = StructType([StructField("f1", StringType(), True)])
 |      >>> struct2 = StructType([StructField("f1", StringType(), True)])
 |      >>> struct1 == struct2
 |  

In [11]:
schema_inline=df_transaction_data_from_csv.schema.simpleString().replace("struct<", "").replace(">", "").replace("int", "int64").replace("double", "float64")
schema_inline

'step:int64,type:string,amount:float64,oldbalanceOrg:float64,newbalanceOrig:float64,oldbalanceDest:float64,newbalanceDest:float64,isFraud:int64,transactionID:string'

In [12]:
df_transaction_data_from_csv.show(5)

+----+-----+-------+-------------+--------------+--------------+--------------+-------+--------------------+
|step| type| amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|       transactionID|
+----+-----+-------+-------------+--------------+--------------+--------------+-------+--------------------+
| 192|DEBIT|2129.14|     254447.0|     252317.86|     441844.59|     443973.73|      0|97b164f8-7ff8-4a7...|
| 395|DEBIT|9910.44|      24040.0|      14129.56|    2368599.34|    2378509.78|      0|80828313-2a6c-4ea...|
| 589|DEBIT|2984.18|      12732.0|       9747.82|     906466.61|     909450.79|      0|831843e6-7611-4a4...|
| 266|DEBIT|1747.32|     113637.0|     111889.68|       71894.4|      73641.72|      0|84449213-8dbb-4b1...|
| 241|DEBIT|   64.6|       1458.0|        1393.4|    3810660.18|    3810724.78|      0|14162c62-0eaf-40c...|
+----+-----+-------+-------------+--------------+--------------+--------------+-------+--------------------+
only showing top 5 

In [13]:
# create the name your BQ datasets
project_id = !gcloud config list --format 'value(core.project)' 2>/dev/null 
dataset_name_raw = project_id[0] + '-raw'
dataset_name_raw = dataset_name_raw.replace('-', '_')
dataset_name_raw

dataset_name_annotated = project_id[0] + '-annotated'
dataset_name_annotated = dataset_name_annotated.replace('-', '_')
dataset_name_annotated

dataset_name_enriched = project_id[0] + '-enriched '
dataset_name_enriched  = dataset_name_enriched.replace('-', '_')
dataset_name_enriched

'thatistoomuchdata_enriched '

Create the BQ dataset by specifying the location

In [17]:
!bq --location=europe-west3 mk -d \
{dataset_name_raw}

BigQuery error in mk operation: Dataset 'thetraining-
project:thetraining_project_raw' already exists.


In [18]:
!bq --location=europe-west3 mk -d \
{dataset_name_annotated}

BigQuery error in mk operation: Dataset 'thetraining-
project:thetraining_project_annotated' already exists.


In [19]:
!bq --location=europe-west3 mk -d \
{dataset_name_enriched}

BigQuery error in mk operation: Dataset 'thetraining-
project:thetraining_project_enriched' already exists.


In [20]:
# create path to new table for creation
bq_table_name = 'transaction_data_train'
bq_table_path= dataset_name_raw + '.' + bq_table_name
bq_table_path

'thetraining_project_raw.transaction_data_train'

Create the BQ table 

In [21]:
!bq mk --table \
{bq_table_path} \
{schema_inline}

BigQuery error in mk operation: Table 'thetraining-
project:thetraining_project_raw.transaction_data_train' could not be created; a
table with this name already exists.


#### Check that table was created

In [22]:
table = project_id[0] + ":" + bq_table_path
df_transaction_data_from_bq = spark.read \
.format("bigquery") \
.option("table", table) \
.load()

In [23]:
df_transaction_data_from_bq.printSchema()

root
 |-- step: long (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: long (nullable = true)
 |-- transactionID: string (nullable = true)



In [24]:
df_transaction_data_from_bq.show()

+----+--------+---------+-------------+--------------+--------------+--------------+-------+--------------------+
|step|    type|   amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|       transactionID|
+----+--------+---------+-------------+--------------+--------------+--------------+-------+--------------------+
| 400|CASH_OUT|220500.08|        741.0|           0.0|     325847.14|     546347.23|      0|2a6b5876-910a-465...|
| 280|CASH_OUT|226093.64|      79708.0|           0.0|     133554.74|     359648.38|      0|10c85cb8-8443-4be...|
| 375|CASH_OUT| 20981.18|      95818.0|      74836.82|    4836404.39|    4857385.56|      0|41670257-18d4-4ce...|
| 250|CASH_OUT|406533.39|      31922.0|           0.0|    5213065.49|    5917097.27|      0|7ec36ae1-f1f8-467...|
|  38|CASH_OUT|384488.25|     599770.0|     215281.75|     552083.02|     936571.26|      0|ffc4e596-3a9d-485...|
| 206|CASH_OUT|213316.26|      17092.0|           0.0|     303531.59|     516847.85|    

Write spark dataframe to BQ table

In [25]:
# create temp GCS bucket for writing spark df to bq table
gcs_bucket = project_id[0] + '-data'
gcs_bucket

'thetraining-project-data'

In [26]:
df_transaction_data_from_csv.write \
.format("bigquery") \
.option("table", table) \
.option("temporaryGcsBucket", gcs_bucket) \
.mode('overwrite') \
.save()

Check if the BQ table is populated 

In [27]:
df_transaction_data_from_bq = spark.read \
.format("bigquery") \
.option("table", table) \
.load()

In [28]:
df_transaction_data_from_bq.show()

+----+--------+----------+-------------+--------------+--------------+--------------+-------+--------------------+
|step|    type|    amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|       transactionID|
+----+--------+----------+-------------+--------------+--------------+--------------+-------+--------------------+
| 302|TRANSFER| 419955.64|      20978.0|           0.0|       30562.0|     751695.31|      0|e56869b0-721a-463...|
| 322|TRANSFER| 169566.23|       3832.0|           0.0|    2717714.64|    2887280.87|      0|761fa0fb-1026-490...|
| 540|TRANSFER|1348788.69|        675.0|           0.0|     171461.44|    1520250.13|      0|ec07894f-94e3-4a1...|
| 349|TRANSFER| 790428.92|         66.0|           0.0|     305371.89|    1095800.81|      0|bdd6d65f-896e-49e...|
|   8|TRANSFER|1746499.66|        209.0|           0.0|       75521.0|     2180688.8|      0|a0022e15-eb69-46d...|
| 156|TRANSFER|1350240.79|     112105.0|           0.0|    1972716.25|    332295

### Compute statistics for columns in table

In [29]:
df_transaction_data_from_bq.describe().show()

+-------+------------------+--------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+
|summary|              step|    type|           amount|    oldbalanceOrg|    newbalanceOrig|   oldbalanceDest|    newbalanceDest|             isFraud|       transactionID|
+-------+------------------+--------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+
|  count|           5090692| 5090692|          5090692|          5090692|           5090692|          5090692|           5090692|             5090692|             5090692|
|   mean|243.43184659374404|    null|179890.2022951485|834053.2275695818| 855323.9460816536|1101305.533638861|1225649.3290917506|0.001279786716619273|                null|
| stddev|142.32545787222787|    null|603890.0044859531|2888545.884476795|2924500.7445227555|3410172.142629792|3684651.5340019865| 0.03575121