<a href="https://colab.research.google.com/github/carloslme/wizeline-bootcamp/blob/main/pyspark/user_behavior.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Common imports
import numpy as np
import os

# Parquet imports
import pyarrow as pa
import pyarrow.parquet as pq

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)


import pandas as pd
import seaborn as sns
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [None]:
# Install dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz 
!tar -xvf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install fsspec
!pip install gcsfs

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType
import pyspark
from pyspark import SparkContext
from pyspark import SparkConf

sc = SparkContext()
spark = SparkSession.builder.getOrCreate()

In [36]:
from google.oauth2 import service_account
from google.cloud.storage import client
import io
import pandas as pd
from io import BytesIO
import json
import filecmp

In [37]:
credentials = service_account.Credentials.from_service_account_file(
    '/content/gcs_service_account.json',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = client.Client(
    credentials=credentials,
    project=credentials.project_id,
)

In [7]:
BUCKET = 'staging-layer-330021'

In [8]:
def download_file(local_filename, remote_filename):
    bucket = client.get_bucket(BUCKET)
    blob = bucket.blob(remote_filename)
    blob.download_to_filename(local_filename)

In [9]:
download_file("/content/user_purchase.csv","user_purchase.csv")

In [10]:
download_file("/content/part-00000-59bf5da5-e43a-4eba-8c9b-153c96ee2a85-c000.snappy.parquet", "reviews.parquet/part-00000-59bf5da5-e43a-4eba-8c9b-153c96ee2a85-c000.snappy.parquet")
download_file("/content/part-00001-59bf5da5-e43a-4eba-8c9b-153c96ee2a85-c000.snappy.parquet", "reviews.parquet/part-00001-59bf5da5-e43a-4eba-8c9b-153c96ee2a85-c000.snappy.parquet")

In [11]:
df_reviews = spark.read.options(header=True).parquet('*.parquet')

In [12]:
df_user_purchase = spark.read.options(header=True).csv('*.csv')

In [13]:
df_reviews.columns

['user_id', 'positive_review']

In [14]:
df_user_purchase.columns

['invoice_number',
 'stock_code',
 'detail',
 'quantity',
 'invoice_date',
 'unit_price',
 'customer_id',
 'country']

In [15]:
from pyspark.sql.functions import *

data = (
    df_reviews.join(df_user_purchase, df_reviews.user_id == df_user_purchase.customer_id).select(df_reviews["positive_review"], df_user_purchase["*"])
)

In [16]:
data.show(10)

+---------------+--------------+----------+--------------------+--------+------------+----------+-----------+--------------+
|positive_review|invoice_number|stock_code|              detail|quantity|invoice_date|unit_price|customer_id|       country|
+---------------+--------------+----------+--------------------+--------+------------+----------+-----------+--------------+
|              0|        536365|    85123A|WHITE HANGING HEA...|       6|1291191960.0|      2.55|      17850|United Kingdom|
|              1|        536365|    85123A|WHITE HANGING HEA...|       6|1291191960.0|      2.55|      17850|United Kingdom|
|              0|        536365|    85123A|WHITE HANGING HEA...|       6|1291191960.0|      2.55|      17850|United Kingdom|
|              0|        536365|    85123A|WHITE HANGING HEA...|       6|1291191960.0|      2.55|      17850|United Kingdom|
|              0|        536365|    85123A|WHITE HANGING HEA...|       6|1291191960.0|      2.55|      17850|United Kingdom|


In [16]:
# customerid   => user_purchase.customerid,
# amount_spent => SUM(user_purchase.quantity * user_purchase.unit_price),
# review_score => SUM(reviews.positive_review),
# review_count => COUNT(reviews.id),                            
# insert_date  => airflow timestamp

In [73]:
df_reviews.createOrReplaceTempView("reviews")
df_user_purchase.createOrReplaceTempView("user_purchase")

In [45]:
spark.sql(
    """
    SELECT 
      up.customer_id AS customer
      , SUM(up.quantity * up.unit_price) as amount_spent
      , SUM(r.positive_review) as review_score
      , COUNT(r.user_id) as review_count
      , current_date() as insert_date
    FROM reviews AS r
    LEFT JOIN user_purchase AS up ON r.user_id == up.customer_id
    GROUP BY 1
    LIMIT 10
    """
).show()

+--------+------------------+------------+------------+-----------+
|customer|      amount_spent|review_score|review_count|insert_date|
+--------+------------------+------------+------------+-----------+
|   15271|246096.18000000415|       12375|       27225| 2021-12-02|
|   15555| 575742.2000000098|       50875|      111925| 2021-12-02|
|   15574| 95505.99999999267|       10416|       22848| 2021-12-02|
|   16250| 37775.67999999979|         744|        2328| 2021-12-02|
|   17551| 36820.80000000055|        2021|        5160| 2021-12-02|
|   17757|  575305.470000109|       31906|       76426| 2021-12-02|
|   14525|426358.37000001606|       11026|       30098| 2021-12-02|
|   13174| 307056.2700000226|       12874|       34226| 2021-12-02|
|   14639|351328.46000000875|        7875|       20825| 2021-12-02|
|   14810|206700.12000000363|        8160|       25245| 2021-12-02|
+--------+------------------+------------+------------+-----------+



In [80]:
logic = spark.sql(
    """
    SELECT 
      CAST(up.customer_id AS INTEGER) AS customer
      , SUM(up.quantity * up.unit_price) as amount_spent
      , SUM(r.positive_review) as review_score
      , COUNT(r.user_id) as review_count
      , current_date() as insert_date
    FROM reviews AS r
    LEFT JOIN user_purchase AS up ON r.user_id == up.customer_id
    GROUP BY 1
    LIMIT 10
    """
)

In [81]:
logic.dtypes

[('customer', 'int'),
 ('amount_spent', 'double'),
 ('review_score', 'bigint'),
 ('review_count', 'bigint'),
 ('insert_date', 'date')]

In [82]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

pd_df = logic.toPandas()

In [83]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client.from_service_account_json('/content/bq_service_account.json')

job_config = bigquery.LoadJobConfig(
    schema=[
            bigquery.SchemaField('customer', 'INTEGER', 'REQUIRED', None, ()),
            bigquery.SchemaField('amount_spent', 'NUMERIC', 'REQUIRED', None, ()),
            bigquery.SchemaField('review_score', 'INTEGER', 'REQUIRED', None, ()),
            bigquery.SchemaField('review_count', 'INTEGER', 'REQUIRED', None, ()),
            bigquery.SchemaField('insert_date', 'TIMESTAMP', 'REQUIRED', None, ())],
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)

# TODO(developer): Set table_id to the ID of the table to create.
table_id = "wizeline-bootcamp-330020.dwh.user_behavior_metric"
table = client.get_table(table_id)  # Make an API request.
schema = table.schema 

In [84]:
job = client.load_table_from_dataframe(
    pd_df, table_id, job_config=job_config
)  # Make an API request.
job.result()  # Wait for the job to complete.

table = client.get_table(table_id)  # Make an API request.
print(
    "Loaded {} rows and {} columns to {}".format(
        table.num_rows, len(table.schema), table_id
    )
)

ArrowInvalid: ignored

### Get table schema and data

In [42]:
table = client.get_table(table_id)  # Make an API request.

# View table properties
print(
    "Got table '{}.{}.{}'.".format(table.project, table.dataset_id, table.table_id)
)
print("Table schema: {}".format(table.schema))
print("Table description: {}".format(table.description))
print("Table has {} rows".format(table.num_rows))

Got table 'wizeline-bootcamp-330020.dwh.user_behavior_metric'.
Table schema: [SchemaField('customer', 'INTEGER', 'REQUIRED', None, ()), SchemaField('amount_spent', 'NUMERIC', 'REQUIRED', None, ()), SchemaField('review_score', 'INTEGER', 'REQUIRED', None, ()), SchemaField('review_count', 'INTEGER', 'REQUIRED', None, ()), SchemaField('insert_date', 'TIMESTAMP', 'REQUIRED', None, ())]
Table description: None
Table has 0 rows
