As a Data Engineer for a fictional E-commerce startup, this project addresses the task of creating a Data Pipeline which transports the web logs that are generated by the servers to Google Big Query (used as a data warehouse). This datawarehouse will help analysts to gain insights like the number of product pages visited by users, the number of items in the cart per hour or any other ad-hoc query needed for other business requirements. This project is created using Google Cloud Platform
Since this is a fictional E-commerece Startup, the Real-Time data stream is generated using this python script. The script generates logs entires simulating either a visit to a product page or addition of a product into the shopping cart. If the log contains a PID
value that is not Null
, then it corresponds to a product added to the shopping cart. This generated web log is then published to a Kafka Topic which is then consumed by a Spark Streaming Application.
A typical entry of a web log into the kafka topic looks like this.
-
The python script publishes the web logs into a Kafka Topic present in a Kafka Cluster (Google Dataproc Cluster)
-
A Spark Streaming application, reads the data stream from the Kafka Topic and writes the dataframe into a Google Cloud Storage (GCS) Bucket. The data is written in Parquet format every minute.
-
A Hive external table is created to index the raw data that writted in the previous step. Apache Airflow (Google Composer) is used to schedule a HiveQL query every hour to process the number of entries for visits and number of entries for products added to the shopping cart. The output of this query is stored in another GCS Bucket.
-
The GCS Bucket to which the output of the previous step is written, is monitored by a Google Cloud Function. Whenever data is written to this bucket, the Cloud Function writes this data to a BigQuery Table **.
-
Use the bash scripts to bring up the following:
- Spark Cluster (Kafka also runs in this cluster)
bash-scripts/create-spark-cluster.sh
- Hive Cluster
bash-scripts/create-hive-cluster.sh
- Spark Cluster (Kafka also runs in this cluster)
-
Create a Airflow Environment using Google Composer and copy the DAGs into the environment from
airflow/
folder. Also, copy the HiveQL scriptsspark-scripts/hive-sql-visitcnt-script.py
andspark-scripts/hive-sql-cart-script.py
into GCS so that the DAGs can use it.
(If you are on GCP's free trial, spinning up these clusters and the airflow environment takes up all the IP Address quota. So close any other VMs that might be already running before trying to spin up these clusters)
- Copy the Python Data Generator in
data/
script into the Master node of the Spark Cluster. Install the necessary dependencies and start the script. - Create the Hive External table using
hive-scripts/create-hive-ext-table.sh
- Start the Airflow DAGs from the Airflow WebUI
- Submit the cloud function
bash-scripts/submit-cloud-function.sh
- Bash scripts to create the Airflow Environment
- ** Currently, the WRITE_DISPOSITION is set to WRITE_TRUNCATE. However, this is a costly operation which will be improved in the future. Also, the data is currently partitioned on the basis of the event date. Since BigQuery has a limit of 4000 partitions per table, this means that this configuration is able to store roughly 11 years (4000/365) worth of data. Remove the WRITE_TRUNCATE way of writing to BigQuery and come up with a way to use WRITE_APPEND to add newly processed data
- Connecting BiqQuery to Tableau to create visualizations of the data
- Add Data-Quality checks in the Airflow DAGs