#### In this notebook we will:
- Use *Google Scholar* data set in CSV file format with fields research_interest, author_name, email.
- Read *research_interest* field from a csv file in *Google Drive* into RDD and then use *flatMap* and *reduceByKey* to count occurance for each of the research_interest with a map function. *flatMap* helps to apply a transformation on a RDD/DataFrame and convert into another RDD/DataFrame. *reduceByKey* helps to merge the values of keys (words) by applying an reducing operator (add) on it. In our example, we apply add reducing operator on word occurance count on each document/row of a DataFrame.
- Apply Aggregate function and sort by column of a DataFrame.
- Create a new column and fill it with a *User Defined Function (UDF)* applied on a field in spark DataFrame.
- Convert a RDD into a DataFrame with or without schema.
- Apply filter on Spark DataFrame.
- Write spark dataframe as a single CSV to a Google Drive Folder.
- Introduce Spark *Join* concept applied on DataFrame.

###### START of PRE-REQUISITE

##### Use apt-get to install basic libraries needed to enable pyspark

In [4]:
!apt-get update 

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Waiting for headers] [1 InRelease 0 B/88.7 kB 0%] [Connecting to cloud.r-pr                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease 88.7 kB/88.7 kB 100%] [Connecting to cloud.r-project.org] [Wait                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
0% [3 InRelease 47.5 kB/88.7 kB 54%] [Connected to cloud.r-project.org (108.1570% [2 InRelease gpgv 242 kB] [3 InRelease 47.5 kB/88.7 kB 54%] [Connected to cl                                                                               Hit:5 http://ppa.launchpad.net/cran/l

In [5]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [6]:
is_distributed = False
!pip install -q findspark
!pip install pytorch_lightning

# !pip install elephas
# !pip install analytics-zoo
# !pip install bigdl

Collecting pytorch_lightning
  Downloading pytorch_lightning-1.5.9-py3-none-any.whl (527 kB)
[K     |████████████████████████████████| 527 kB 13.8 MB/s 
Collecting torchmetrics>=0.4.1
  Downloading torchmetrics-0.7.0-py3-none-any.whl (396 kB)
[K     |████████████████████████████████| 396 kB 59.3 MB/s 
[?25hCollecting future>=0.17.1
  Downloading future-0.18.2.tar.gz (829 kB)
[K     |████████████████████████████████| 829 kB 64.6 MB/s 
[?25hCollecting pyDeprecate==0.3.1
  Downloading pyDeprecate-0.3.1-py3-none-any.whl (10 kB)
Collecting PyYAML>=5.1
  Downloading PyYAML-6.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (596 kB)
[K     |████████████████████████████████| 596 kB 58.6 MB/s 
Collecting fsspec[http]!=2021.06.0,>=2021.05.0
  Downloading fsspec-2022.1.0-py3-none-any.whl (133 kB)
[K     |████████████████████████████████| 133 kB 64.5 MB/s 
Collecting setuptools==59.5.0
  Downloading setuptools-59.5.0-py3-none-any.whl (952 kB

In [7]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [8]:
import findspark
findspark.init()

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

##### Mount Google Drive

In [11]:
# mount google drive to the linux running this Colab application
from google.colab import drive
drive.mount('/content/drive')
# list few directories (check access)
! ls -ltr /content/drive/MyDrive/dataset/
! ls -ltr /content/drive/MyDrive/data_processing/
!ls -ltr /content/drive/MyDrive/data_processing/results/top_10_corona_dist/
!ls -ltr /content/drive/MyDrive/ |head -2

Mounted at /content/drive
total 17
drwx------ 2 root root 4096 Jan  4 21:56  dataset_json
drwx------ 2 root root 4096 Jan  4 21:56  dataset_html
drwx------ 2 root root 4096 Jan  4 21:56  dataset_csv
drwx------ 2 root root 4096 Jan  4 21:59  dataset_xml
-rw------- 1 root root  143 Jan 18 06:19 'ReadME -> dataset.gdoc'
total 8
drwx------ 2 root root 4096 Jan  4 21:55 results
drwx------ 2 root root 4096 Jan  4 21:55 intermediate
total 1
-rw------- 1 root root 255 Jan 13 19:16 part-00000-de3a3f7b-bff7-49f8-a6ff-8f4cfa8e6fad-c000.csv
-rw------- 1 root root   0 Jan 13 19:16 _SUCCESS
total 3096140
-rw------- 1 root root       143 Apr  3  2012 api id s (1).gsheet


##### Variable Declarations

In [34]:
# data sets
path_google_scholar = "/content/drive/MyDrive/dataset/dataset_csv/dataset-google-scholar/output.csv"
# data sets
path_covid = "/content/drive/MyDrive/dataset/dataset_csv/dataset-covid/cdc-pfizer-covid-19-vaccine-distribution-by-state.csv"
# data set taken from https://github.com/nytimes/covid-19-data/blob/master/us-counties.csv
path_covid2 = "/content/drive/MyDrive/dataset/dataset_csv/dataset-covid-2/us-counties.csv"
# base output path
path_out_base_result = "/content/drive/MyDrive/data_processing/results/"

In [28]:
# output file for top 10 corona distribution
path_out_avg = path_out_base_result + "top_10_corona_dist"


In [14]:
# IMPORT important Libraries
import pyspark.sql.functions as F
import pandas as pd
from tabulate import tabulate
import traceback
import numpy as np
import matplotlib.pyplot as plt
from requests import get
import requests
import os
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import torch
import matplotlib
import pytorch_lightning
from __future__ import print_function
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

def pretty_print_pandas(title, df, n):
  """ Pretty print
  """
  print(f"{title}:")
  print(tabulate(df.head(n), headers="keys", tablefmt="psql" ))

##### Sample read few data sets available

READ input files

In [42]:
# read google scholar
df_gs = spark \
        .read \
        .option("header", True) \
        .csv(path_google_scholar)

# read covid 
df_covid = spark \
        .read \
        .option("header", True) \
        .csv(path_covid)
df_covid.show(n=2)
# read covid 2
df_covid2 = spark \
        .read \
        .option("header", True) \
        .csv(path_covid2)
df_covid2.show(n=2)
c1 = df_covid.select(F.lower("jurisdiction")).distinct().count()
c2 = df_covid2.select(F.lower("state")).distinct().count()
print(f"c1: {c1} c2: {c2}")
# unique states from covid ds 1
df_covid.select(F.lower("jurisdiction")).distinct().show(n=10)
# unique states from covid ds 2
df_covid2.select("state").distinct().show(n=10)

+------------+--------------------+---------------------+---------------------+
|jurisdiction| week_of_allocations|_1st_dose_allocations|_2nd_dose_allocations|
+------------+--------------------+---------------------+---------------------+
| Connecticut|2021-06-21T00:00:...|                54360|                54360|
|       Maine|2021-06-21T00:00:...|                21420|                21420|
+------------+--------------------+---------------------+---------------------+
only showing top 2 rows

+----------+---------+----------+-----+-----+------+
|      date|   county|     state| fips|cases|deaths|
+----------+---------+----------+-----+-----+------+
|2020-01-21|Snohomish|Washington|53061|    1|     0|
|2020-01-22|Snohomish|Washington|53061|    1|     0|
+----------+---------+----------+-----+-----+------+
only showing top 2 rows

c1: 63 c2: 56
+-------------------+
|lower(jurisdiction)|
+-------------------+
|      west virginia|
|      new hampshire|
|    mariana islands|
|     



###### END OF PRE-PREQUISITE

### Google Scholar -> calculate frequency of research interest

In [16]:
# CALCULATE FREQUENCY FOR EACH WORD IN RESEARCH INTEREST
%%time
from pyspark import SparkContext
from operator import add
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql import SQLContext

# read into dataframe (df)
df_gs = spark.read.option("header", True).csv(path_google_scholar)
# research_interest can't be None
df_gs_clean = df_gs.filter("research_interest != 'None'")
# referring Column Names
rdd_ri = df_gs_clean.rdd.map(lambda x: (x["research_interest"]))
print("\nSample RDD rows:")
print(rdd_ri.take(5))
print("\nSample RDD rows after frequenc count for each words:")
# flatMap() helps to apply transformation
rdd_ri_freq = rdd_ri.flatMap(lambda x: [(w.lower(), 1) for w in x.split('##')]).reduceByKey(add)
# rdd print with take() function
print(rdd_ri_freq.take(5))

# approach 1 : convert to df without any schema (no proper col names)
df_ri_freq = rdd_ri_freq.toDF() 

pretty_print_pandas("RI freq without schema", df_ri_freq, 10)

# approach 2 : convert to df with schema
schema = StructType([StructField("ri", StringType(), False), 
                     StructField("frequency", IntegerType(), False)
])
# convert rdd to df with schema
df = spark.createDataFrame(rdd_ri_freq, schema)
print("\nProposed Schema of DF:")
# print schema (to verify)
df.printSchema()
print("\nRDD converted to DF with schema:")
# sort
df_sort = df.sort(F.col("frequency").desc())
df_sort.show(10, truncate=False)



Sample RDD rows:
['data_mining##anomaly_detection', 'artificial_intelligence##machine_learning##data_mining##graph_mining##security', 'machine_learning##never_ending_learning##lifelong_machine_learning##medical_informatics', 'graph_mining##big_data_analytics##machine_learning', 'network_security##cyber_physical_systems_security##cyber_education_and_workforce_development']

Sample RDD rows after frequenc count for each words:
[('data_mining', 63), ('anomaly_detection', 5), ('artificial_intelligence', 123), ('machine_learning', 198), ('graph_mining', 5)]
RI freq without schema:
+---------------------------+-----+
| 0                         |   1 |
|---------------------------+-----|
| data_mining               |  63 |
| anomaly_detection         |   5 |
| artificial_intelligence   | 123 |
| machine_learning          | 198 |
| graph_mining              |   5 |
| security                  |  25 |
| never_ending_learning     |   1 |
| lifelong_machine_learning |   1 |
| medical_informatic

In [17]:
# This example takes all the columns in the given google scholar file and process the rdd

# rdd
rdd = spark.sparkContext.textFile(path_google_scholar)
print(type(rdd))
counts = rdd.flatMap(lambda x: [(w.lower(), 1) for w in x.split(',')]).reduceByKey(add)
print(counts.take(5))

<class 'pyspark.rdd.RDD'>
[('author_name', 1), ('email', 1), ('affiliation', 1), ('coauthors_names', 1), ('research_interest', 1)]


### UDF to create a new field "is_artificial_intellence" of boolean type

In [20]:
from pyspark.sql.types import StringType, IntegerType
import traceback

lst_ai  = ["data_science", "artificial_intelligence",
           "machine_learning"]

@F.udf
def is_ai(research):
    """ return 1 if research in AI domain else 0
    """
    try:
      # split the research interest string with delimiter "##"  
      lst_research = [w.lower() for w in str(research).split("##")]

      for res in lst_research:
        # if present in AI domain
        if res in lst_ai:
          return 1
      # not present in AI domain
      return 0
    except:
      return -1
 
# df read 
df_gs = spark.read.option("header", True).csv(path_google_scholar)
# create a new column "is_artificial_intelligence"
df_gs_new = df_gs.withColumn("is_artificial_intelligence",\
                             is_ai(F.col("research_interest")))
# df_gs_new.printSchema()
df_gs.show(5, truncate=False)
df_gs_new.show(n=20)
print(f"Verify that is_ai should have only two distinct value: 0 & 1")
df_gs_new.select("is_artificial_intelligence").distinct().show(5)
# show selective columns for analysis
df_gs_new[df_gs_new["author_name"].isin(["Christa Cody", "Gabriel Weimann", ""])]\
    .select("author_name","research_interest","is_artificial_intelligence")\
    .show(5, truncate=False)


+----------------------+------------------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------+
|author_name           |email             |affiliation                       |coauthors_names                                                                                                                                      |research_interest                                                                           |
+----------------------+------------------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------+
|William Eberle        |tntech.edu

# Reference:
[View RDD content](https://stackoverflow.com/questions/25295277/view-rdd-contents-in-python-spark)


#### TOP 10 VACCINE WEEKLY 1ST DOES DISTRIBUTION STATES

###### Write as CSV to Google Drive

In [68]:
# print sample
df_covid.show(n=2, truncate=False)
# group by average
df_avg_1 = df_covid.groupby("jurisdiction")\
  .agg(F.avg("_1st_dose_allocations")
  .alias("avg"))\
  .sort(F.col("avg").desc())\
  .toDF("state", "avg")

print("Top 10 States by 1st dose covid vaccine distribution")
df_avg_1.show(n=10)
print(type(df_avg_1))
# write top 10 by average corona weekly vaccine states 
df_avg_1.limit(10) \
        .coalesce(1) \
        .write \
        .mode("overwrite") \
        .option("header", True) \
        .option("quoteAll",True) \
        .csv(path_out_avg)

+------------+-----------------------+---------------------+---------------------+
|jurisdiction|week_of_allocations    |_1st_dose_allocations|_2nd_dose_allocations|
+------------+-----------------------+---------------------+---------------------+
|Connecticut |2021-06-21T00:00:00.000|54360                |54360                |
|Maine       |2021-06-21T00:00:00.000|21420                |21420                |
+------------+-----------------------+---------------------+---------------------+
only showing top 2 rows

Top 10 States by 1st dose covid vaccine distribution
+----------------+----------+
|           state|       avg|
+----------------+----------+
|      California|  561307.5|
|           Texas| 384333.75|
|         Florida|306883.125|
|Federal Entities|  213150.0|
|            Ohio| 168761.25|
|    Pennsylvania|166415.625|
|        New York| 164036.25|
|  North Carolina| 147026.25|
|         Georgia| 146036.25|
|        Illinois| 146036.25|
+----------------+----------+
only

###### Rename the Spark dataframe written non-human readable CSV file to human-readable one

In [None]:
from os import listdir
import os

def find_csv_filenames( path_to_dir, suffix=".csv" ):
    """ return list of filenames that ends with suffix
    """
    filenames = listdir(path_to_dir)
    return [ filename for filename in filenames if filename.endswith( suffix ) ]

# get file name that just wrote (csv file name)
path_csv_file_path = path_out_avg + "/" + find_csv_filenames(path_out_avg)[0]
# output
path_new_file = path_out_avg + "/" + "top_10_states.csv"
# old file name and new file name
print(f"path_csv_file_path: {path_csv_file_path} \n path_new_file: {path_new_file}")
# rename file
os.rename(path_csv_file_path, path_new_file)

###### PANDAS: Calculate Average For Each State

In [None]:
# PANDAS - Example of reading a csv and writing as a CSV
path_new_file = path_out_avg + "/" + "top_10_states.csv"
print(f"input file -> {path_new_file}")
# read already existing file
df_in_state = pd.read_csv(path_new_file)
# sample
print(df_in_state.head(2))
# calculate avg for each state (the input file already have avg)
# just example to use pandas to do same operation of average
df_in_top10 = df_in_state.groupby("state")["avg"].mean().to_frame("avg").reset_index()
# output 2
path_new_file_pd = path_out_avg + "/" + "top_10_states_pandas.csv"
print(f"output to {path_new_file_pd}")
# write pandas df to csv
df_in_top10.to_csv(path_new_file_pd)

## SPARK: Average of 1st and 2nd DOSE

In [52]:
# for each state, calculate average # 1st dose, average # 2nd dose

print(list(df_covid))

# calculate average weekly 1st dose vaccine distribution. 
df_avg = df_covid.groupby(F.lower("jurisdiction").alias("state"))\
  .agg(F.avg("_1st_dose_allocations").alias("avg_1"), \
       F.avg("_2nd_dose_allocations").alias("avg_2"), \
       F.sum("_1st_dose_allocations").alias("sum_1"), \
       F.sum("_2nd_dose_allocations").alias("sum_2")
       ) \
  .sort(F.col("avg_1").desc())


df_avg.show(15)


[Column<'jurisdiction'>, Column<'week_of_allocations'>, Column<'_1st_dose_allocations'>, Column<'_2nd_dose_allocations'>]
+----------------+----------+----------+---------+---------+
|           state|     avg_1|     avg_2|    sum_1|    sum_2|
+----------------+----------+----------+---------+---------+
|      california|  561307.5|  561307.5|8980920.0|8980920.0|
|           texas| 384333.75| 384333.75|6149340.0|6149340.0|
|         florida|306883.125|306883.125|4910130.0|4910130.0|
|federal entities|  213150.0|  213150.0|3197250.0|3197250.0|
|            ohio| 168761.25| 168761.25|2700180.0|2700180.0|
|    pennsylvania|166415.625|166415.625|2662650.0|2662650.0|
|        new york| 164036.25| 164036.25|2624580.0|2624580.0|
|  north carolina| 147026.25| 147026.25|2352420.0|2352420.0|
|        illinois| 146036.25| 146036.25|2336580.0|2336580.0|
|         georgia| 146036.25| 146036.25|2336580.0|2336580.0|
|        michigan|144826.875|144826.875|2317230.0|2317230.0|
|      new jersey| 12913

### SPARK: Equi join first covid data set and second covid data set

In [53]:
 
print(df_covid2.show(2))
# groupby Sex
df_cases = df_covid2 \
          .groupby(F.lower("state").alias("state")) \
          .agg(F.sum("deaths").alias("sum_deaths"), \
              F.sum("cases").alias("sum_cases"))
df_cases.show(n=3)
 

+----------+---------+----------+-----+-----+------+
|      date|   county|     state| fips|cases|deaths|
+----------+---------+----------+-----+-----+------+
|2020-01-21|Snohomish|Washington|53061|    1|     0|
|2020-01-22|Snohomish|Washington|53061|    1|     0|
+----------+---------+----------+-----+-----+------+
only showing top 2 rows

None
+-------------+----------+------------+
|        state|sum_deaths|   sum_cases|
+-------------+----------+------------+
|west virginia| 1286901.0|  7.631901E7|
|new hampshire|  620816.0| 4.3191729E7|
|      alabama| 5005646.0|2.68440532E8|
+-------------+----------+------------+
only showing top 3 rows



### Spark INNER JOIN EXAMPLE & LEFT JOIN EXAMPLE

In [67]:
# get total # distinct states from covid (moderna) dataset and covid (ny) dataset. 
c1 = df_avg.select("state").distinct().count()
c2 = df_cases.select("state").distinct().count()
print(f"c1: {c1} | c2: {c2}")
# create an alias for each of the DataFrame to be joined
df_m = df_avg.alias("df_m")
df_ny = df_cases.alias("df_ny")
print("EQUI JOIN")
# EQUI JOIN / INNER JOIN -> only matched on both side (DataFrame) on column "state" 
df_inner = df_m.join(df_ny, F.col("df_m.state") == F.col("df_ny.state"), 'inner')
lst_interest = ["df_m.state", "df_ny.state", "df_m.avg_1", "df_m.avg_2", "df_ny.sum_deaths", "df_ny.sum_cases"]
# print all states
df_inner.select(*lst_interest).show(n=73, truncate=False)

# total distinct states count
c_inner = df_inner.select("df_m.state").distinct().count()
print(f"Total # states after inner join: {c_inner}")
 
print("LEFT JOIN")
# LEFT JOIN -> all rows on the left side table, right side table not having matched values will be "null"
df_left = df_m.join(df_ny, F.col("df_m.state") == F.col("df_ny.state"), 'left')
df_left.show(n=100)

# total distinct states count
c_left = df_left.select("df_m.state").distinct().count()
print(f"Total # states after inner join: {c_left}")

c1: 63 | c2: 56
EQUI JOIN
+--------------------+--------------------+----------+----------+-----------+-------------+
|state               |state               |avg_1     |avg_2     |sum_deaths |sum_cases    |
+--------------------+--------------------+----------+----------+-----------+-------------+
|west virginia       |west virginia       |27675.0   |27675.0   |1286901.0  |7.631901E7   |
|new hampshire       |new hampshire       |20711.25  |20711.25  |620816.0   |4.3191729E7  |
|alabama             |alabama             |70745.625 |70745.625 |5005646.0  |2.68440532E8 |
|new york            |new york            |164036.25 |164036.25 |2.777481E7 |9.36502157E8 |
|american samoa      |american samoa      |731.25    |0.0       |0.0        |115.0        |
|north carolina      |north carolina      |147026.25 |147026.25 |5992027.0  |4.67823807E8 |
|pennsylvania        |pennsylvania        |166415.625|166415.625|1.2362565E7|5.23851352E8 |
|alaska              |alaska              |15096.0   |

### REFERENCES

In [None]:
https://www.tutorialspoint.com/pyspark/pyspark_rdd.htm
https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/
https://sparkbyexamples.com/spark/print-the-contents-of-rdd-in-spark-pyspark/
# pipelineds RDD creation when using map operation
https://stackoverflow.com/questions/44355416/need-instance-of-rdd-but-returned-class-pyspark-rdd-pipelinedrdd