# Pipeline

In this notebook, the relevant quotes for our data story are fetched and stored as new files. We use pyspark to treat large dataframes and switch to pandas to do a first treatment of the data.

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
%cd "/content/drive/My Drive/ADA/Project/"

Mounted at /content/drive
/content/drive/My Drive/ADA/Project


In [None]:
!pip install pyspark

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pathlib
import bz2
import json
import requests
from pyspark.sql import functions as f

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 63.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=944f9c49f42a55772fbe11a9ed03243345c6b2ed5332dcafe44281273883e918
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
def fetch_quotes(pyspark_dataframe, year, keyword_list, filename):
  ''' This function takes a 'year' to fetch the right file in memory, and a list of keywords which will determine what quotes to select in the 'pyspark_dataframe'.
  The selected quotes will be saved in a file named 'filename', with format json.'''

  print('loading...')
  pyspark_dfquotes = pyspark_dataframe

  print('reading keywords...')
  # transform the keywords to a dataframe
  df_keywords = spark.createDataFrame(keyword_list, 'string').toDF('keywords')


  print('filtering the quotes...')
  #fetch the corresponding quotes 
  joined = pyspark_dfquotes.join(df_keywords, f.col('quotation').contains(f.col('keywords')), 'left').withColumn('isRT', f.expr('if(keywords is null, False, True)')).drop('keywords')
  pyspark_dfquotes = joined.filter(joined.isRT == 'true')

  print('converting to pandas dataframe...')
  # to pandas the dataframe, which is then returned for additional purposes
  pandas_dfquotes = pyspark_dfquotes.toPandas()
  pandas_dfquotes = pandas_dfquotes.drop('isRT', axis = 1)


  print('write to compressed json file')
  # saved the dataframe to a json file
  filename = 'data/'+ filename
  pandas_dfquotes.to_json(path_or_buf=filename) #if you want a compression, you can add the type directly in the filename among these : {‘infer’, ‘gzip’, ‘bz2’, ‘zip’, ‘xz’, None}

  return pandas_dfquotes

# 2015

In [None]:
year = 2015
# Load the dataframe corresponding to the year
path = pathlib.Path('quote_extraction.ipynb').parent.resolve()
path = str(path)
path_to_file = path + '/data/quotes-' + str(year) + '-process.json.bz2' 
pyspark_dfquotes_2015 = spark.read.json(path_to_file)

In [None]:
# Tesla
keywords_2015_Tesla = ['Tesla', 'tesla', 'Model S', 'model S', 'model s', 'Model X', 'model X', 'model x', 'Model Y', 'model Y', 'model y']
pandas_Tesla_2015 = fetch_quotes (pyspark_dfquotes_2015, year = 2015, keyword_list = keywords_2015_Tesla , filename = 'Tesla-quotes-2015.json') # 2405 quotes

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


In [None]:
# Chevrolet
keywords_2015_Chevy = ['Chevrolet', 'chevrolet', 'Silverado', 'silverado']
pandas_Chevy_2015 = fetch_quotes (pyspark_dfquotes_2015, year = 2015, keyword_list = keywords_2015_Chevy , filename = 'Chevy-quotes-2015.json') # 918 quotes

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


In [None]:
# Toyota
keywords_2015_Toyota = ['Toyota', 'toyota']
pandas_Toyota_2015 = fetch_quotes (pyspark_dfquotes_2015, year = 2015, keyword_list = keywords_2015_Toyota , filename = 'Toyota-quotes-2015.json') # 918 quotes

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


# 2016

In [None]:
year = 2016
# Load the dataframe corresponding to the year
path_to_file = path + '/data/quotes-' + str(year) + '-process.json.bz2' 
pyspark_dfquotes_2016 = spark.read.json(path_to_file)

In [None]:
# Tesla
keywords_2016_Tesla = ['Tesla', 'tesla', 'Model S', 'model S', 'model s', 'Model X', 'model X', 'model x', 'Model Y', 'model Y', 'model y', 'Model 3', 'model 3', 'model three']
pandas_Tesla_2016 = fetch_quotes (pyspark_dfquotes_2016 , year = 2016, keyword_list = keywords_2016_Tesla , filename = 'Tesla-quotes-2016.json')  # 2199 quotes

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


In [None]:
# Chevrolet
keywords_2016_Chevy = ['Chevrolet', 'chevrolet', 'Silverado', 'silverado']
pandas_Chevy_2016 = fetch_quotes (pyspark_dfquotes_2016, year = 2016, keyword_list = keywords_2016_Chevy , filename = 'Chevy-quotes-2016.json')# 577 quotes

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


In [None]:
# Toyota
keywords_2016_Toyota = ['Toyota', 'toyota']
pandas_Toyota_2016 = fetch_quotes (pyspark_dfquotes_2016 , year = 2016, keyword_list = keywords_2016_Toyota , filename = 'Toyota-quotes-2016.json') # 

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


# 2017

In [None]:
  year = 2017
  # Load the dataframe corresponding to the year
  path_to_file = path + '/data/quotes-' + str(year) + '-process.json.bz2' 
  pyspark_dfquotes_2017 = spark.read.json(path_to_file)

In [None]:
# Tesla
keywords_2017_Tesla = ['Tesla', 'tesla', 'Model S', 'model S', 'model s', 'Model X', 'model X', 'model x', 'Model Y', 'model Y', 'model y', 'Model 3', 'model 3', 'model three', 'Tesla Roadster', 'Tesla roadster', 'tesla roadster', 'Tesla Cybertruck', 'tesla Cybertruck', 'tesla cybertruck','Cybertruck', 'cybertruck']
pandas_Tesla_2017 = fetch_quotes (pyspark_dfquotes_2017, year = 2017, keyword_list = keywords_2017_Tesla , filename = 'Tesla-quotes-2017.json') # 4204 quotes

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


In [None]:
# Chevrolet
keywords_2017_Chevy = ['Chevrolet', 'chevrolet', 'Silverado', 'silverado']
pandas_Chevy_2017 = fetch_quotes (pyspark_dfquotes_2017, year = 2017, keyword_list = keywords_2017_Chevy , filename = 'Chevy-quotes-2017.json')  # 906 quotes

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


In [None]:
# Toyota
keywords_2017_Toyota = ['Toyota', 'toyota']
pandas_Toyota_2017 = fetch_quotes (pyspark_dfquotes_2017 , year = 2017, keyword_list = keywords_2017_Toyota , filename = 'Toyota-quotes-2017.json') 

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


# 2018

In [None]:
year = 2018
# Load the dataframe corresponding to the year
path_to_file = path + '/data/quotes-' + str(year) + '-process.json.bz2' 
pyspark_dfquotes_2018 = spark.read.json(path_to_file)

In [None]:
# Tesla
keywords_2018_Tesla = ['Tesla', 'tesla', 'Model S', 'model S', 'model s', 'Model X', 'model X', 'model x', 'Model Y', 'model Y', 'model y', 'Model 3', 'model 3', 'model three', 'Tesla Roadster', 'Tesla roadster', 'tesla roadster', 'Tesla Cybertruck', 'tesla Cybertruck', 'tesla cybertruck','Cybertruck', 'cybertruck']
pandas_Tesla_2018 = fetch_quotes (pyspark_dfquotes_2018, year = 2018, keyword_list = keywords_2018_Tesla , filename = 'Tesla-quotes-2018.json') # 5663 quotes

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


In [None]:
# Chevrolet
keywords_2016_Chevy = ['Chevrolet', 'chevrolet', 'Silverado', 'silverado']
pandas_Chevy_2018 = fetch_quotes (pyspark_dfquotes_2018, year = 2018, keyword_list = keywords_2016_Chevy , filename = 'Chevy-quotes-2018.json')  # 850 quotes

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


In [None]:
# Toyota
keywords_2018_Toyota = ['Toyota', 'toyota']
pandas_Toyota_2018 = fetch_quotes (pyspark_dfquotes_2018 , year = 2018, keyword_list = keywords_2018_Toyota , filename = 'Toyota-quotes-2018.json') # 

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


# 2019

In [None]:
year = 2019
# Load the dataframe corresponding to the year
path_to_file = path + '/data/quotes-' + str(year) + '-process.json.bz2' 
pyspark_dfquotes_2019 = spark.read.json(path_to_file)

In [None]:
# Tesla 2019 
keywords_2019_Tesla = ['Tesla', 'tesla', 'Model S', 'model S', 'model s', 'Model X', 'model X', 'model x', 'Model Y', 'model Y', 'model y', 'Model 3', 'model 3', 'model three', 'Tesla Roadster', 'Tesla roadster', 'tesla roadster', 'Tesla Cybertruck', 'tesla Cybertruck', 'tesla cybertruck', 'Cybertruck', 'cybertruck']
pandas_Tesla_2019 = fetch_quotes (pyspark_dfquotes_2019, year = 2019, keyword_list = keywords_2019_Tesla , filename = 'Tesla-quotes-2019.json') # 4167 quotes

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


In [None]:
# Chevrolet 2019 
keywords_2019_Chevy = ['Chevrolet', 'Silverado', 'Chevrolet Silverado']
pandas_Chevy_2019 = fetch_quotes (pyspark_dfquotes_2019, year = 2019, keyword_list = keywords_2019_Chevy , filename = 'Chevy-quotes-2019.json') # 701  quotes

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


In [None]:
# Toyota
keywords_2019_Toyota = ['Toyota', 'toyota']
pandas_Toyota_2019 = fetch_quotes (pyspark_dfquotes_2019 , year = 2019, keyword_list = keywords_2019_Toyota , filename = 'Toyota-quotes-2019.json') # 

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


# 2020

In [None]:
year = 2020
# Load the dataframe corresponding to the year
path_to_file = path + '/data/quotes-' + str(year) + '-process.json.bz2' 
pyspark_dfquotes_2020 = spark.read.json(path_to_file)

In [None]:
# Tesla 2020
keywords_2020_Tesla =  ['Tesla', 'tesla', 'Model S', 'model S', 'model s', 'Model X', 'model X', 'model x', 'Model Y', 'model Y', 'model y', 'Model 3', 'model 3', 'model three', 'Tesla Roadster', 'Tesla roadster', 'tesla roadster', 'Tesla Cybertruck', 'tesla Cybertruck', 'tesla cybertruck', 'Cybertruck', 'cybertruck']
pandas_Tesla_2020 = fetch_quotes (pyspark_dfquotes_2020, year = 2020, keyword_list = keywords_2020_Tesla , filename = 'Tesla-quotes-2020.json') # 942  quotes

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


In [None]:
# Chevrolet 2020
keywords_2020_Chevy = ['Chevrolet', 'Silverado', 'Chevrolet Silverado']
pandas_Chevy_2020 = fetch_quotes (pyspark_dfquotes_2020, year = 2020, keyword_list = keywords_2020_Chevy , filename = 'Chevy-quotes-2020.json')  # 94 quotes

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file


In [None]:
# Toyota
keywords_2020_Toyota = ['Toyota', 'toyota']
pandas_Toyota_2020 = fetch_quotes (pyspark_dfquotes_2020 , year = 2020, keyword_list = keywords_2020_Toyota , filename = 'Toyota-quotes-2020.json') # 

loading...
reading keywords...
filtering the quotes...
converting to pandas dataframe...
write to compressed json file
