# Data Processing in Spark

## Set up

(copied from lab 5)

In [1]:
#getting google drive mounted
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz
!rm spark-3.3.2-bin-hadoop3.tgz 

# Setting up our environmental variables: 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

!pip install -q findspark
import findspark
findspark.init()

--2023-03-09 18:19:27--  https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299360284 (285M) [application/x-gzip]
Saving to: ‘spark-3.3.2-bin-hadoop3.tgz’


2023-03-09 18:19:28 (176 MB/s) - ‘spark-3.3.2-bin-hadoop3.tgz’ saved [299360284/299360284]



In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) #  This will format our output tables a bit nicer when not using the show() method
spark

Copying files over and removing whitespace

In [6]:
#bringing in stopwords file from drive into current directory
!cp /content/drive/MyDrive/project/stopwords.txt /content/
!mkdir articles
#copying all article files from drive to current directory
!cp /content/drive/MyDrive/project/wikipedia-ml/article* /content/articles/

#removing white space to put all files in hadoop
#hadoop didnt like it when there were spaces in the file name
#inspiration for this: https://stackoverflow.com/questions/2709458/how-to-replace-spaces-in-file-names-using-a-bash-script
!for f in articles/*\ *; do mv "$f" "${f// /_}"; done

# MapReduce-like operations

So the way I did it, I have things stored in text files since I am not too profecient with lambda functions yet. If I were in a real-world setting, I would use lambda functions to simplify the code and not rely on txt files.

## Co Occurence

Co Occurence map like function

In [17]:
sc = spark.sparkContext
with open("stopwords.txt") as s:
    stopwords = set([line.strip() for line in s])

path = "/content/articles/"

#iterate through our years given
for year in range(2013, 2024):
  for filename in os.listdir(path):
    #only continue if the file is in the year we want
    if str(year) in filename:

      #similar to map reduce, this is the map part
      #opening the files we want
      with open(path + filename) as f:

        #reading lines and replacing the punctuation
        lines = f.readlines()
        for line in lines:
          line = line.replace(".", "").replace(",", "").replace("?","").replace("!","")
          words = line.strip().split()
          for i in range(len(words)):
            # If the word is a stopword, skip it
            if words[i] in stopwords:
                continue
            # Iterate over the words in the window
            for j in range(i + 1, len(words)):
            # If the word is a stopword, skip it
                if words[j] in stopwords:
                    continue
              # Output the co-occurrence pairs in a text file for the given year
              #this giving the map output in a text file which the reduce will use as input
              #to aggregate
              #make sure we use "a" in the open method so it appends and not rewrites
                with open(f"output_{year}.txt", "a") as o:
                    print('%s\t%s\t%s' % (words[i], words[j], 1), file=o)

    

Co Occurence reducer like function

In [25]:
#now that we have all the output txt files, we can read them in
#and perform a reduce for each year
delimiter = '\t'

for year in range(2013, 2024):
  #i like using python dictionaries to store stuff, so here it is
  myDict = {}
  #read through lines of each year's map output
  with open(f"/content/output_{year}.txt") as f:
    lines = f.readlines();
    for line in lines:
      first, second, count = line.strip().split(delimiter)
      key = (first + "," + second)
      count = int(count)

      #simple frequency table
      if key in myDict.keys():
        myDict[key] += count
      else:
        myDict[key] = 1

#putting this all in a new output file for each year
#sperating values with commas instead of tabs and funky stuff like in hadoop
#also adding the headers to make the spark schema easier
#this will make loading into pyspark much easier
  with open(f"reduced_{year}.txt", "a") as r:
    print("word_0,word_1,count",file=r)
    for key in myDict.keys():
      print(key + "," + str(myDict[key]),file=r)


Co occurence: putting it into spark dataframes and calculating stuff

In [31]:
#now that the reducer has done its job, we can sift through the output
#and put all of it in a dataframe
dfs={}

for year in range(2013, 2024):
  dfs[year] = spark.read.options(delimiter=',', header=True, inferSchema=True).csv(f"/content/reduced_{year}.txt")


In [35]:
for year in range(2013, 2024):
  print(year)
  #dfs[year].printSchema()
  dfs[year].orderBy("count", ascending=False).show(10)

#dfs[2014].select("*").show(5)

2013
+----------+--------------+-----+
|    word_0|        word_1|count|
+----------+--------------+-----+
|  learning|      learning|   73|
|  learning|    algorithms|   60|
|  learning|          data|   58|
|   machine|      learning|   52|
|  learning|representation|   44|
|algorithms|representation|   44|
|algorithms|      learning|   39|
|algorithms|      features|   39|
|algorithms|    algorithms|   38|
|  learning|           can|   36|
+----------+--------------+-----+
only showing top 10 rows

2014
+----------+--------------+-----+
|    word_0|        word_1|count|
+----------+--------------+-----+
|  learning|      learning|   71|
|  learning|          data|   58|
|  learning|    algorithms|   55|
|   machine|      learning|   54|
|  learning|           can|   35|
|algorithms|      learning|   35|
|algorithms|representation|   34|
|  learning|representation|   34|
|algorithms|      features|   33|
|algorithms|    algorithms|   32|
+----------+--------------+-----+
only showing

In [39]:
#deleting these files now because the data is already stored in
#our dataframe
!rm output*
!rm reduced*

## Trigrams

Trigrams map like function

In [40]:
#now doing similar map-reduce-spark for trigrams
#map part

#copying some of the initial code to read files from before

path = "/content/articles/"

#iterate through our years given
for year in range(2013, 2024):
  for filename in os.listdir(path):
    #only continue if the file is in the year we want
    if str(year) in filename:

      #similar to map reduce, this is the map part
      #opening the files we want
      with open(path + filename) as f:

        #reading lines and replacing the punctuation
        lines = f.readlines()
        for line in lines:
          line = line.replace(".", "").replace(",", "").replace("?","").replace("!","")
          words = line.strip().split()
          for i in range(len(words) -2):
            key = (words[i] + "," + words[i+1] + ","+words[i+2])

            with open(f"output2_{year}.txt", "a") as o:
              print('%s\t%s' % (key, 1),file=o)
                    
            

Trigrams reduce like function

In [41]:
#reduce part of trigram
#most of this code is copied from the previous co occurence reducer
delimiter = '\t'

for year in range(2013, 2024):
  #i like using python dictionaries to store stuff, so here it is
  myDict = {}
  #read through lines of each year's map output
  with open(f"/content/output2_{year}.txt") as f:
    lines = f.readlines();
    for line in lines:
      key, count = line.strip().split(delimiter)
      
      count = int(count)

      #simple frequency table
      if key in myDict.keys():
        myDict[key] += count
      else:
        myDict[key] = 1

#putting this all in a new output file for each year
#sperating values with commas instead of tabs and funky stuff like in hadoop
#also adding the headers to make the spark schema easier
#this will make loading into pyspark much easier
  with open(f"reduced2_{year}.txt", "a") as r:
    print("word_0,word_1,word_2,count",file=r)
    for key in myDict.keys():
      print(key + "," + str(myDict[key]),file=r)



Trigram putting into spark dataframes and doing operations on it

In [43]:

dfs2={}

for year in range(2013, 2024):
  dfs2[year] = spark.read.options(delimiter=',', header=True, inferSchema=True).csv(f"/content/reduced2_{year}.txt")



In [44]:
for year in range(2013, 2024):
  print(year)
  dfs2[year].orderBy("count", ascending=False).show(10)

2013
+-------------+----------+--------+-----+
|       word_0|    word_1|  word_2|count|
+-------------+----------+--------+-----+
|            a|       set|      of|   11|
|           of|   machine|learning|   10|
|           in|polynomial|    time|    6|
|         with|   respect|      to|    6|
|   algorithms|   attempt|      to|    6|
|           be|      used|      to|    6|
|computational|  learning|  theory|    6|
|      results|      show|    that|    4|
|         spam|       and|non-spam|    4|
|           be|   learned|      in|    4|
+-------------+----------+--------+-----+
only showing top 10 rows

2014
+-------------+----------+----------+-----+
|       word_0|    word_1|    word_2|count|
+-------------+----------+----------+-----+
|           of|   machine|  learning|   12|
|            a|       set|        of|   10|
|           in|polynomial|      time|    6|
|           be|      used|        to|    6|
|   algorithms|   attempt|        to|    6|
|computational|  learnin

In [45]:
#deleting these files now because the data is already stored in
#our dataframe
!rm output*
!rm reduced*