In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m429.6 kB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=bdef52bd54a2be2e40183bdbf1b466e75c6a6e35956ee36d0a17d0d1a343f29b
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql import functions as F
import requests

In [None]:
spark = SparkSession.builder.appName('Dataproc Transformation').getOrCreate()
sc = spark.sparkContext

def get_data(url: str, schema: StructType = None,
             separator: str = '\t',
             header: bool = False) -> DataFrame:

  data = requests.get(url).text

  lines = data.splitlines()
  df: DataFrame = None

  infer_schema = False if schema else True
  if header:
    column_names = lines[0].split(separator)
    lines = lines[0:]
    df = spark.read.csv(sc.parallelize(lines), sep=separator, schema=schema,
                        inferSchema=infer_schema, header=True)
    if not schema:
      df = df.toDF(*column_names)
  else:
      df = spark.read.csv(sc.parallelize(lines), sep=separator, schema=schema, inferSchema=infer_schema)
  return df

In [None]:
order_url = 'https://raw.githubusercontent.com/DaPlayfulQueen/DE_track_data/master/northwind/orders.csv'

order_df = get_data(order_url, separator=',', header=True)
order_df.show(5)

+-------+----------+----------+-------------------+-------------------+--------------------+-------+-------+--------------------+-------------------+--------------+----------+--------------+-----------+
|orderID|customerID|employeeID|          orderDate|       requiredDate|         shippedDate|shipVia|freight|            shipName|        shipAddress|      shipCity|shipRegion|shipPostalCode|shipCountry|
+-------+----------+----------+-------------------+-------------------+--------------------+-------+-------+--------------------+-------------------+--------------+----------+--------------+-----------+
|  10248|     VINET|         5|1996-07-04 00:00:00|1996-08-01 00:00:00|1996-07-16 00:00:...|      3|  32.38|Vins et alcools C...| 59 rue de l'Abbaye|         Reims|      NULL|         51100|     France|
|  10249|     TOMSP|         6|1996-07-05 00:00:00|1996-08-16 00:00:00|1996-07-10 00:00:...|      1|  11.61|  Toms Spezialitäten|      Luisenstr. 48|       Münster|      NULL|         4408

In [None]:
order_details_url = 'https://raw.githubusercontent.com/DaPlayfulQueen/DE_track_data/master/northwind/order_details.csv'

order_details_df = get_data(order_details_url, separator=',', header=True)
order_details_df.show(5)

+-------+---------+---------+--------+--------+
|orderID|productID|unitPrice|quantity|discount|
+-------+---------+---------+--------+--------+
|  10248|       11|     14.0|      12|     0.0|
|  10248|       42|      9.8|      10|     0.0|
|  10248|       72|     34.8|       5|     0.0|
|  10249|       14|     18.6|       9|     0.0|
|  10249|       51|     42.4|      40|     0.0|
+-------+---------+---------+--------+--------+
only showing top 5 rows



In [None]:
products_url = 'https://raw.githubusercontent.com/DaPlayfulQueen/DE_track_data/master/northwind/products.csv'

products_df = get_data(products_url, separator=',', header=True)
products_df.show(5)

+---------+--------------------+----------+----------+-------------------+---------+------------+------------+------------+------------+
|productID|         productName|supplierID|categoryID|    quantityPerUnit|unitPrice|unitsInStock|unitsOnOrder|reorderLevel|discontinued|
+---------+--------------------+----------+----------+-------------------+---------+------------+------------+------------+------------+
|        1|                Chai|         1|         1| 10 boxes x 20 bags|     18.0|          39|           0|          10|           0|
|        2|               Chang|         1|         1| 24 - 12 oz bottles|     19.0|          17|          40|          25|           0|
|        3|       Aniseed Syrup|         1|         2|12 - 550 ml bottles|     10.0|          13|          70|          25|           0|
|        4|Chef Anton's Caju...|         2|         2|     48 - 6 oz jars|     22.0|          53|           0|           0|           0|
|        5|Chef Anton's Gumb...|         

In [None]:
customers_url = 'https://raw.githubusercontent.com/DaPlayfulQueen/DE_track_data/master/northwind/customers.csv'

customers_df = get_data(customers_url, separator=',', header=True)
customers_df.show(5)

+----------+--------------------+------------------+--------------------+--------------------+-----------+------+----------+-------+--------------+--------------+
|customerID|         companyName|       contactName|        contactTitle|             address|       city|region|postalCode|country|         phone|           fax|
+----------+--------------------+------------------+--------------------+--------------------+-----------+------+----------+-------+--------------+--------------+
|     ALFKI| Alfreds Futterkiste|      Maria Anders|Sales Representative|       Obere Str. 57|     Berlin|  NULL|     12209|Germany|   030-0074321|   030-0076545|
|     ANATR|Ana Trujillo Empa...|      Ana Trujillo|               Owner|Avda. de la Const...|México D.F.|  NULL|     05021| Mexico|  (5) 555-4729|  (5) 555-3745|
|     ANTON|Antonio Moreno Ta...|    Antonio Moreno|               Owner|     Mataderos  2312|México D.F.|  NULL|     05023| Mexico|  (5) 555-3932|          NULL|
|     AROUT|     Aroun

In [None]:
def show_nulls(df: DataFrame) -> None:
  df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
show_nulls(products_df)

+---------+-----------+----------+----------+---------------+---------+------------+------------+------------+------------+
|productID|productName|supplierID|categoryID|quantityPerUnit|unitPrice|unitsInStock|unitsOnOrder|reorderLevel|discontinued|
+---------+-----------+----------+----------+---------------+---------+------------+------------+------------+------------+
|        0|          0|         0|         0|              0|        0|           0|           0|           0|           0|
+---------+-----------+----------+----------+---------------+---------+------------+------------+------------+------------+



In [None]:
show_nulls(order_df)

+-------+----------+----------+---------+------------+-----------+-------+-------+--------+-----------+--------+----------+--------------+-----------+
|orderID|customerID|employeeID|orderDate|requiredDate|shippedDate|shipVia|freight|shipName|shipAddress|shipCity|shipRegion|shipPostalCode|shipCountry|
+-------+----------+----------+---------+------------+-----------+-------+-------+--------+-----------+--------+----------+--------------+-----------+
|      0|         0|         0|        0|           0|          0|      0|      0|       0|          0|       0|         0|             0|          0|
+-------+----------+----------+---------+------------+-----------+-------+-------+--------+-----------+--------+----------+--------------+-----------+



In [None]:
show_nulls(order_details_df)

+-------+---------+---------+--------+--------+
|orderID|productID|unitPrice|quantity|discount|
+-------+---------+---------+--------+--------+
|      0|        0|        0|       0|       0|
+-------+---------+---------+--------+--------+



In [None]:
show_nulls(customers_df)

+----------+-----------+-----------+------------+-------+----+------+----------+-------+-----+---+
|customerID|companyName|contactName|contactTitle|address|city|region|postalCode|country|phone|fax|
+----------+-----------+-----------+------------+-------+----+------+----------+-------+-----+---+
|         0|          0|          0|           0|      0|   0|     0|         0|      0|    0|  0|
+----------+-----------+-----------+------------+-------+----+------+----------+-------+-----+---+



In [None]:
products_df_to_mr = products_df.withColumn('unitPriceForProduct', products_df['unitPrice']).drop('unitPrice')
order_details_df_to_mr = order_details_df.withColumn('unitPriceForOrder', order_details_df['unitPrice']).drop('unitPrice')

full_order_details_df = order_details_df_to_mr.join(order_df, 'orderID', 'left') \
                                            .join(products_df_to_mr, 'productID', 'left')
full_order_details_df.show(10)
full_order_details_df.count()

+---------+-------+--------+--------+-----------------+----------+----------+-------------------+-------------------+--------------------+-------+-------+--------------------+------------------+--------------+----------+--------------+-----------+--------------------+----------+----------+-----------------+------------+------------+------------+------------+-------------------+
|productID|orderID|quantity|discount|unitPriceForOrder|customerID|employeeID|          orderDate|       requiredDate|         shippedDate|shipVia|freight|            shipName|       shipAddress|      shipCity|shipRegion|shipPostalCode|shipCountry|         productName|supplierID|categoryID|  quantityPerUnit|unitsInStock|unitsOnOrder|reorderLevel|discontinued|unitPriceForProduct|
+---------+-------+--------+--------+-----------------+----------+----------+-------------------+-------------------+--------------------+-------+-------+--------------------+------------------+--------------+----------+--------------+---

2155

In [None]:
customer_purchases_df = order_details_df.join(order_df, 'orderId', 'left')
customer_purchases_df = customer_purchases_df.groupBy('customerId').agg(F.sum(F.col('unitPrice') * F.col('quantity')).alias('total_spent'))
customer_purchases_df.show()

+----------+------------------+
|customerId|       total_spent|
+----------+------------------+
|     WOLZA|           3531.95|
|     MAISD|          10430.58|
|     BLAUS|            3239.8|
|     MAGAA| 7603.849999999999|
|     FOLKO|          32555.55|
|     ISLAT|            6146.3|
|     VAFFE|           16643.8|
|     ANATR|1402.9499999999998|
|     BLONP|           19088.0|
|     CENTC|             100.8|
|     TRAIH|1571.1999999999998|
|     SPLIR|12489.699999999999|
|     LILAS|17825.059999999998|
|     WARTH|           16617.1|
|     FRANR|           3172.16|
|     SEVES|          17172.05|
|     HILAA|23611.579999999998|
|     EASTC|          15033.66|
|     HANAR|          34101.15|
|     DRACD|           3763.21|
+----------+------------------+
only showing top 20 rows



In [None]:
customer_purchases_df.select(F.min('total_spent'), F.max('total_spent')).show()

+----------------+------------------+
|min(total_spent)|  max(total_spent)|
+----------------+------------------+
|           100.8|117483.39000000001|
+----------------+------------------+



In [None]:
quantiles = customer_purchases_df.approxQuantile('total_spent', [0.25, 0.5, 0.75], 0.001)
quantiles

[3361.0, 7555.6, 18138.45]

In [None]:
spending_category_expr = F.when(customer_purchases_df.total_spent < 1000, 'low') \
                          .when((customer_purchases_df.total_spent >= 1000) & (customer_purchases_df.total_spent < 15000), 'medium') \
                          .otherwise('high')
customer_purchases_df.withColumn('spending_category', spending_category_expr).show()

+----------+------------------+-----------------+
|customerId|       total_spent|spending_category|
+----------+------------------+-----------------+
|     WOLZA|           3531.95|           medium|
|     MAISD|          10430.58|           medium|
|     BLAUS|            3239.8|           medium|
|     MAGAA| 7603.849999999999|           medium|
|     FOLKO|          32555.55|             high|
|     ISLAT|            6146.3|           medium|
|     VAFFE|           16643.8|             high|
|     ANATR|1402.9499999999998|           medium|
|     BLONP|           19088.0|             high|
|     CENTC|             100.8|              low|
|     TRAIH|1571.1999999999998|           medium|
|     SPLIR|12489.699999999999|           medium|
|     LILAS|17825.059999999998|             high|
|     WARTH|           16617.1|             high|
|     FRANR|           3172.16|           medium|
|     SEVES|          17172.05|             high|
|     HILAA|23611.579999999998|             high|


In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql import functions as F
import requests

spark = SparkSession.builder.appName('Dataproc Transformation').getOrCreate()
sc = spark.sparkContext

def get_data(url: str, schema: StructType = None,
             separator: str = '\t',
             header: bool = False) -> DataFrame:

  data = requests.get(url).text

  lines = data.splitlines()
  df: DataFrame = None

  infer_schema = False if schema else True
  if header:
    column_names = lines[0].split(separator)
    lines = lines[0:]
    df = spark.read.csv(sc.parallelize(lines), sep=separator, schema=schema,
                        inferSchema=infer_schema, header=True)
    if not schema:
      df = df.toDF(*column_names)
  else:
      df = spark.read.csv(sc.parallelize(lines), sep=separator, schema=schema, inferSchema=infer_schema)
  return df

order_url = 'https://raw.githubusercontent.com/DaPlayfulQueen/DE_track_data/master/northwind/orders.csv'
order_df = get_data(order_url, separator=',', header=True)
order_details_url = 'https://raw.githubusercontent.com/DaPlayfulQueen/DE_track_data/master/northwind/order_details.csv'
order_details_df = get_data(order_details_url, separator=',', header=True)
products_url = 'https://raw.githubusercontent.com/DaPlayfulQueen/DE_track_data/master/northwind/products.csv'
products_df = get_data(products_url, separator=',', header=True)
customers_url = 'https://raw.githubusercontent.com/DaPlayfulQueen/DE_track_data/master/northwind/customers.csv'
customers_df = get_data(customers_url, separator=',', header=True)

  products_df_to_mr = products_df.withColumn('unitPriceForProduct', products_df['unitPrice']).drop('unitPrice')
  order_details_df_to_mr = order_details_df.withColumn('unitPriceForOrder', order_details_df['unitPrice']).drop('unitPrice')
  full_order_details_df = order_details_df_to_mr.join(order_df, 'orderID', 'left') \
                                              .join(products_df_to_mr, 'productID', 'left')

  customer_purchases_df = full_order_details_df.join(customers_df, 'customerId', 'left').groupBy('customerId').agg(F.sum(F.col('unitPriceForOrder') * F.col('quantity')).alias('total_spent'))
  spending_category_expr = F.when(customer_purchases_df.total_spent < 1000, 'low') \
                            .when((customer_purchases_df.total_spent >= 1000) & (customer_purchases_df.total_spent < 15000), 'medium') \
                            .otherwise('high')
  customer_purchases_df = customer_purchases_df.withColumn('spending_category', spending_category_expr)

  customer_purchases_df.show(5)
  full_order_details_df.show(5)


+----------+-----------------+-----------------+--------------------+--------------------+--------------------+--------------------+---------+------+----------+-------+--------------+--------------+
|customerId|      total_spent|spending_category|         companyName|         contactName|        contactTitle|             address|     city|region|postalCode|country|         phone|           fax|
+----------+-----------------+-----------------+--------------------+--------------------+--------------------+--------------------+---------+------+----------+-------+--------------+--------------+
|     WOLZA|          3531.95|           medium|      Wolski  Zajazd|Zbyszek Piestrzen...|               Owner|     ul. Filtrowa 68| Warszawa|  NULL|    01-012| Poland| (26) 642-7012| (26) 642-7012|
|     MAISD|         10430.58|           medium|        Maison Dewey|     Catherine Dewey|         Sales Agent| Rue Joseph-Bens 532|Bruxelles|  NULL|    B-1180|Belgium|(02) 201 24 67|(02) 201 24 68|
|    

In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql import functions as F
import requests

def get_data_from_db(database_url: str, db_properties: dict, table_name: str) -> DataFrame:
  spark = SparkSession.builder.appName('Dataproc Transformation').getOrCreate()
  sc = spark.sparkContext
  return spark.read.jdbc(url=database_url, table=table_name, properties=db_properties)


def write_to_bigquery(df: DataFrame, dataset: str, table: str):
  df.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save(f'{dataset}.{table}')

def round_double_columns(df):
    double_columns = [col_name for col_name, data_type in df.dtypes if data_type == 'double']

    # Round the double-type columns
    for col_name in double_columns:
        df = df.withColumn(col_name, col(col_name).cast('decimal(18, 3)').cast('double'))

    return df


def main():
  database_url = "jdbc:postgresql://172.21.160.3:5432/northwind"
  database_properties = {
      "user": "reader",
      "password": "mycoolpass",
      "driver": "org.postgresql.Driver"
  }

  order_df = get_data_from_db(database_url, database_properties, 'orders')
  order_details_df = get_data_from_db(database_url, database_properties, 'order_details')
  products_df = get_data_from_db(database_url, database_properties, 'products')
  customers_df = get_data_from_db(database_url, database_properties, 'customers')


  products_df_to_mr = products_df.withColumn('unitPriceForProduct', products_df['unitPrice']).drop('unitPrice')
  order_details_df_to_mr = order_details_df.withColumn('unitPriceForOrder', order_details_df['unitPrice']).drop('unitPrice')
  full_order_details_df = order_details_df_to_mr.join(order_df, 'orderID', 'left') \
                                              .join(products_df_to_mr, 'productID', 'left')

  customer_purchases_df = full_order_details_df.groupBy('customerId').agg(F.sum(F.col('unitPriceForOrder') * F.col('quantity')).alias('total_spent'))
  spending_category_expr = F.when(customer_purchases_df.total_spent < 1000, 'low') \
                            .when((customer_purchases_df.total_spent >= 1000) & (customer_purchases_df.total_spent < 15000), 'medium') \
                            .otherwise('high')
  customer_purchases_df = customer_purchases_df.withColumn('spending_category', spending_category_expr).join(customers_df, 'customerId', 'left')

  customer_purchases_df.show(5)
  full_order_details_df.show(5)

  write_to_bigquery(customer_purchases_df, 'northwind', 'customers')
  write_to_bigquery(full_order_details_df, 'northwind', 'order_details')

if __name__ == '__main__':
  main()

In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DecimalType
from pyspark.sql import functions as F
import requests

def get_data_from_db(database_url: str, db_properties: dict, table_name: str) -> DataFrame:
  spark = SparkSession.builder.appName('Dataproc Transformation').getOrCreate()
  sc = spark.sparkContext
  return spark.read.jdbc(url=database_url, table=table_name, properties=db_properties)


def write_to_bigquery(df: DataFrame, dataset: str, table: str):
  df.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save(f'{dataset}.{table}')

def cast_decimal_columns(df):
    casted_df = df
    for col_name in df.columns:
        data_type = df.schema[col_name].dataType
        if isinstance(data_type, DecimalType):
            casted_df = casted_df.withColumn(col_name, F.col(col_name).cast('double'))
    return casted_df


def main():
  database_url = "jdbc:postgresql://172.21.160.3:5432/northwind"
  database_properties = {
      "user": "reader",
      "password": "mycoolpass",
      "driver": "org.postgresql.Driver"
  }

  order_df = get_data_from_db(database_url, database_properties, 'orders')
  order_details_df = get_data_from_db(database_url, database_properties, 'order_details')
  products_df = get_data_from_db(database_url, database_properties, 'products')
  customers_df = get_data_from_db(database_url, database_properties, 'customers')

  products_df_to_mr = products_df.withColumn('unitPriceForProduct', products_df['unitPrice']).drop('unitPrice')
  order_details_df_to_mr = order_details_df.withColumn('unitPriceForOrder', order_details_df['unitPrice']).drop('unitPrice')
  full_order_details_df = order_details_df_to_mr.join(order_df, 'orderID', 'left') \
                                              .join(products_df_to_mr, 'productID', 'left')

  customer_purchases_df = full_order_details_df.groupBy('customerId').agg(F.sum(F.col('unitPriceForOrder') * F.col('quantity')).alias('total_spent'))
  spending_category_expr = F.when(customer_purchases_df.total_spent < 1000, 'low') \
                            .when((customer_purchases_df.total_spent >= 1000) & (customer_purchases_df.total_spent < 15000), 'medium') \
                            .otherwise('high')
  customer_purchases_df = customer_purchases_df.withColumn('spending_category', spending_category_expr).join(customers_df, 'customerId', 'left')

  customer_purchases_df = cast_decimal_columns(customer_purchases_df)
  full_order_details_df = cast_decimal_columns(full_order_details_df)

  customer_purchases_df.show(5)
  full_order_details_df.show(5)

  write_to_bigquery(customer_purchases_df, 'northwind', 'customers')
  write_to_bigquery(full_order_details_df, 'northwind', 'order_details')

if __name__ == '__main__':
  main()

In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DecimalType
from pyspark.sql import functions as F
import requests

def get_data_from_db(database_url: str, db_properties: dict, table_name: str) -> DataFrame:
  spark = SparkSession.builder.appName('Dataproc Transformation').getOrCreate()
  sc = spark.sparkContext
  return spark.read.jdbc(url=database_url, table=table_name, properties=db_properties)


def write_to_bigquery(df: DataFrame, dataset: str, table: str):
  df.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save(f'{dataset}.{table}')

def cast_decimal_columns(df):
    casted_df = df
    for col_name in df.columns:
        data_type = df.schema[col_name].dataType
        if isinstance(data_type, DecimalType):
            casted_df = casted_df.withColumn(col_name, F.col(col_name).cast('double'))
    return casted_df


def main():
  database_url = "jdbc:postgresql://172.21.160.3:5432/northwind"
  database_properties = {
      "user": "reader",
      "password": "mycoolpass",
      "driver": "org.postgresql.Driver"
  }

  order_df = get_data_from_db(database_url, database_properties, 'orders')
  order_details_df = get_data_from_db(database_url, database_properties, 'order_details')
  products_df = get_data_from_db(database_url, database_properties, 'products')
  customers_df = get_data_from_db(database_url, database_properties, 'customers')

  order_df = cast_decimal_columns(order_df)
  order_details_df = cast_decimal_columns(order_details_df)
  products_df = cast_decimal_columns(products_df)
  customers_df = cast_decimal_columns(customers_df)

  order_df.show(2)
  order_details_df.show(2)
  products_df.show(2)
  customers_df.show(2)

  write_to_bigquery(order_df, 'northwind_raw', 'orders')
  write_to_bigquery(order_details_df, 'northwind_raw', 'order_details')
  write_to_bigquery(products_df, 'northwind_raw', 'products')
  write_to_bigquery(customers_df, 'northwind_raw', 'customers')

if __name__ == '__main__':
  main()