<a href="https://colab.research.google.com/github/Kibika/A-B-Testing/blob/main/extract_load.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## ELT Pipeline
This project aims to make use of the dbt tool to build an ELT pipeline. The data used is a large dataset with upto 10 million rows found in https://anson.ucdavis.edu/~clarkf/ .

This notebook does the extraction of the data from the website into mysql database. We use pysaprk to write the data into a dataframe and write to mysql.


In [None]:
!apt-get install openjdk-8-jdk

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following additional packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libgail-common libgail18 libgtk2.0-0 libgtk2.0-bin
  libgtk2.0-common libxxf86dga1 openjdk-8-jdk-headless openjdk-8-jre
  openjdk-8-jre-headless x11-utils
Suggested packages:
  gvfs openjdk-8-demo openjdk-8-source visualvm icedtea-8-plugin libnss-mdns
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic mesa-utils
The following NEW packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libgail-common libgail18 libgtk2.0-0 libgtk2.0-bin
  libgtk2.0-common libxxf86dga1 openjdk-8-jdk openjdk-8-jdk-headless
  openjdk-8-jre openjdk-8-jre-headless x11-utils
0 upgraded, 15 newly installed, 0 to remove and 37 not upgraded.
Need to get 43.5 MB of archives.
After this 

In [None]:
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz 

In [None]:
!pip install -q findspark

In [None]:
!pip install -U pyarrow

Collecting pyarrow
  Downloading pyarrow-5.0.0-cp37-cp37m-manylinux2014_x86_64.whl (23.6 MB)
[K     |████████████████████████████████| 23.6 MB 127 kB/s 
Installing collected packages: pyarrow
  Attempting uninstall: pyarrow
    Found existing installation: pyarrow 3.0.0
    Uninstalling pyarrow-3.0.0:
      Successfully uninstalled pyarrow-3.0.0
Successfully installed pyarrow-5.0.0


In [None]:
ls /usr/lib/jvm/

[0m[01;36mdefault-java[0m@               [01;34mjava-11-openjdk-amd64[0m/     [01;34mjava-8-openjdk-amd64[0m/
[01;36mjava-1.11.0-openjdk-amd64[0m@  [01;36mjava-1.8.0-openjdk-amd64[0m@


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.add_packages('mysql:mysql-connector-java:8.0.11')
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
# spark = SparkSession.builder.getOrCreate()
sc = SparkContext('local')
sqlContext = SQLContext(sc)

In [None]:
# sc.stop()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# cd '/content/drive/MyDrive/speech'

Read in the unzipped text file and store in a dataframe.

In [None]:
# log_txt=sc.textFile("/content/drive/MyDrive/speech/I80_davis.txt")
log_txt=sc.textFile("/content/drive/MyDrive/speech/d04_text_station_raw_2016_08_22.txt")

In [None]:
log_txt.take(1)

['08/22/2016 00:00:06,400001,1,.0061,86,0,0,0,2,.0117,86,0,0,0,1,.0067,71,,,,,,,,,']

In [None]:
# header = log_txt.first()

In [None]:
temp_var = log_txt.map(lambda k: k.split(","))


In [None]:
# log_txt = log_txt.filter(lambda line: line != header)


In [None]:
log_df=temp_var.toDF()
log_df.show(2)

+-------------------+------+---+-----+---+---+-----+---+---+-----+---+---+---+---+---+-----+---+---+---+---+---+---+---+---+---+---+
|                 _1|    _2| _3|   _4| _5| _6|   _7| _8| _9|  _10|_11|_12|_13|_14|_15|  _16|_17|_18|_19|_20|_21|_22|_23|_24|_25|_26|
+-------------------+------+---+-----+---+---+-----+---+---+-----+---+---+---+---+---+-----+---+---+---+---+---+---+---+---+---+---+
|08/22/2016 00:00:06|400001|  1|.0061| 86|  0|    0|  0|  2|.0117| 86|  0|  0|  0|  1|.0067| 71|   |   |   |   |   |   |   |   |   |
|08/22/2016 00:00:06|400010|  1|.0067| 78|  3|.0206| 71|  4|.0228| 78|  3|.02| 65|   |     |   |   |   |   |   |   |   |   |   |   |
+-------------------+------+---+-----+---+---+-----+---+---+-----+---+---+---+---+---+-----+---+---+---+---+---+---+---+---+---+---+
only showing top 2 rows



In [None]:
# log_df.printSchema()

In [None]:
newColumns = ["utc_time_id", "source_id", "primary_link_source_flag", "avg_speed",
              "avg_flow", "avg_occ", "avg_freeflow_speed", "avg_travel_time",
              "high_quality_samples", "samples_below_100pct_ff", "samples_below_95pct_ff",
              "samples_below_90pct_ff", "samples_below_85pct_ff", "samples_below_80pct_ff",
              "samples_below_75pct_ff", "samples_below_70pct_ff", "samples_below_65pct_ff",
              "samples_below_60pct_ff", "samples_below_55pct_ff", "samples_below_50pct_ff",
              "samples_below_45pct_ff", "samples_below_40pct_ff", "samples_below_35pct_ff",
              "samples_below_30pct_ff", "samples_below_25pct_ff", "samples_below_20pct_ff"]

In [None]:
log_df = log_df.toDF(*newColumns)

In [None]:
log_df.show(2)

+-------------------+---------+------------------------+---------+--------+-------+------------------+---------------+--------------------+-----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|        utc_time_id|source_id|primary_link_source_flag|avg_speed|avg_flow|avg_occ|avg_freeflow_speed|avg_travel_time|high_quality_samples|samples_below_100pct_ff|samples_below_95pct_ff|samples_below_90pct_ff|samples_below_85pct_ff|samples_below_80pct_ff|samples_below_75pct_ff|samples_below_70pct_ff|samples_below_65pct_ff|samples_below_60pct_ff|samples_below_55pct_ff|samples_below_50pct_ff|samples_below_45pct_ff|samples_below_40pct_ff|samples_below_35pct_ff|sample

Transform utc_time_id into a format that can be loaded into mysql datetime type column.

In [None]:
from pyspark.sql.functions import col, udf, to_timestamp

log_df = log_df.withColumn('test', to_timestamp('utc_time_id','MM/dd/yyyy HH:mm:ss'))
log_df.show(2)
    

+-------------------+---------+------------------------+---------+--------+-------+------------------+---------------+--------------------+-----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+-------------------+
|        utc_time_id|source_id|primary_link_source_flag|avg_speed|avg_flow|avg_occ|avg_freeflow_speed|avg_travel_time|high_quality_samples|samples_below_100pct_ff|samples_below_95pct_ff|samples_below_90pct_ff|samples_below_85pct_ff|samples_below_80pct_ff|samples_below_75pct_ff|samples_below_70pct_ff|samples_below_65pct_ff|samples_below_60pct_ff|samples_below_55pct_ff|samples_below_50pct_ff|samples_below_45pct_ff|samples_below_40pct_ff|samples_b

In [None]:
log_df = log_df.drop(log_df.utc_time_id)
log_df.show(2)

+---------+------------------------+---------+--------+-------+------------------+---------------+--------------------+-----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+-------------------+
|source_id|primary_link_source_flag|avg_speed|avg_flow|avg_occ|avg_freeflow_speed|avg_travel_time|high_quality_samples|samples_below_100pct_ff|samples_below_95pct_ff|samples_below_90pct_ff|samples_below_85pct_ff|samples_below_80pct_ff|samples_below_75pct_ff|samples_below_70pct_ff|samples_below_65pct_ff|samples_below_60pct_ff|samples_below_55pct_ff|samples_below_50pct_ff|samples_below_45pct_ff|samples_below_40pct_ff|samples_below_35pct_ff|samples_below_30pct_ff|sam

In [None]:
log_df = log_df.withColumnRenamed("test", "utc_time_id")
log_df.show(20)

+---------+------------------------+---------+--------+-------+------------------+---------------+--------------------+-----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+-------------------+
|source_id|primary_link_source_flag|avg_speed|avg_flow|avg_occ|avg_freeflow_speed|avg_travel_time|high_quality_samples|samples_below_100pct_ff|samples_below_95pct_ff|samples_below_90pct_ff|samples_below_85pct_ff|samples_below_80pct_ff|samples_below_75pct_ff|samples_below_70pct_ff|samples_below_65pct_ff|samples_below_60pct_ff|samples_below_55pct_ff|samples_below_50pct_ff|samples_below_45pct_ff|samples_below_40pct_ff|samples_below_35pct_ff|samples_below_30pct_ff|sam

In [None]:
log_df = log_df.select("utc_time_id", "source_id", "primary_link_source_flag", "avg_speed",
              "avg_flow", "avg_occ", "avg_freeflow_speed", "avg_travel_time",
              "high_quality_samples", "samples_below_100pct_ff", "samples_below_95pct_ff",
              "samples_below_90pct_ff", "samples_below_85pct_ff", "samples_below_80pct_ff",
              "samples_below_75pct_ff", "samples_below_70pct_ff", "samples_below_65pct_ff",
              "samples_below_60pct_ff", "samples_below_55pct_ff", "samples_below_50pct_ff",
              "samples_below_45pct_ff", "samples_below_40pct_ff", "samples_below_35pct_ff",
              "samples_below_30pct_ff", "samples_below_25pct_ff", "samples_below_20pct_ff")
log_df.show(2)

+-------------------+---------+------------------------+---------+--------+-------+------------------+---------------+--------------------+-----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|        utc_time_id|source_id|primary_link_source_flag|avg_speed|avg_flow|avg_occ|avg_freeflow_speed|avg_travel_time|high_quality_samples|samples_below_100pct_ff|samples_below_95pct_ff|samples_below_90pct_ff|samples_below_85pct_ff|samples_below_80pct_ff|samples_below_75pct_ff|samples_below_70pct_ff|samples_below_65pct_ff|samples_below_60pct_ff|samples_below_55pct_ff|samples_below_50pct_ff|samples_below_45pct_ff|samples_below_40pct_ff|samples_below_35pct_ff|sample

In [None]:
# log_df.printSchema()

Add 100000 rows first to test functionality.

In [None]:
limited_df = log_df.limit(100000)
limited_df.show(2)

+-------------------+---------+------------------------+---------+--------+-------+------------------+---------------+--------------------+-----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|        utc_time_id|source_id|primary_link_source_flag|avg_speed|avg_flow|avg_occ|avg_freeflow_speed|avg_travel_time|high_quality_samples|samples_below_100pct_ff|samples_below_95pct_ff|samples_below_90pct_ff|samples_below_85pct_ff|samples_below_80pct_ff|samples_below_75pct_ff|samples_below_70pct_ff|samples_below_65pct_ff|samples_below_60pct_ff|samples_below_55pct_ff|samples_below_50pct_ff|samples_below_45pct_ff|samples_below_40pct_ff|samples_below_35pct_ff|sample

Load the rest of the dataset

In [None]:
log_df1 = log_df.subtract(limited_df)

In [None]:
#use ngrok to expose the my-sql server 
log_df1.repartition(10).write.format('jdbc').options(
      numPartitions = "5",
      url='jdbc:mysql://8.tcp.ngrok.io:11051/sensor_db?useSSL=false&autoReconnect=true&useUnicode=true&characterEncoding=UTF-8&maxAllowedPacket=16777216&rewriteBatchedStatements=true',
      batchsize="100000",
      driver='com.mysql.jdbc.Driver',
      dbtable='sensordata',
      user='',
      password='').mode('append').save()

##EXTRACTION


In [None]:
# !wget https://anson.ucdavis.edu/~clarkf/richards.csv.gz  --no-check-certificate | tar -xz

In [None]:
# !gzip -d /content/drive/MyDrive/speech/richards.csv.gz

In [None]:
# !tar -xf "/content/drive/MyDrive/speech/d04_text_station_raw_2016_08_22.txt.gz" -C "/content/drive/MyDrive/speech/sensor/"

In [None]:
# f = open("/content/drive/MyDrive/speech/d04_text_station_raw_2016_08_22.txt", "r")
# print(f.readlines(3))

In [None]:
# def read_lines(fname,n,mode='r+'):
# 	with open(fname) as f:
# 		for i in range(n):
# 			print(f.readline())

# read_lines('/content/drive/MyDrive/speech/d04_text_station_raw_2016_08_22.txt',5)