In [1]:
import re
import math
import Stemmer
import csv
from pyspark.sql import SparkSession

In [3]:
def csv_parser(input_file,output_file):
    region_data = {}

    with open(input_file, 'r') as csv_file:
        csv_reader = csv.reader(csv_file)
        for row in csv_reader:

            date = row[0]
            region = row[1]
            weather_news = row[2]

            if region not in region_data:
                region_data[region] = []

            region_data[region].append(weather_news)

    with open(output_file, 'w') as text_file:
        for region, weather_news in region_data.items():

            combined_news = ",".join(weather_news)

            text_file.write(f"{region},{combined_news}\n")

In [4]:
def mapper(key,value):

    stop_words=set()
    with open("stop_words.txt") as f:
        for line in f:
            stop_words.add(line.rstrip('\r\n'))

    value=value[0]
    it = re.finditer(r"\w+",value,re.UNICODE)
    words=dict()
    stemmer = Stemmer.Stemmer('english')

    length=0

    for match in it:
        token=match.group().lower()

        if not(token in stop_words):
            length=length+1
            token=stemmer.stemWord(token)
            if token in words:
                words[token]+=1
            else:
                words[token]=1
    for word, count in words.items():
        yield word, (key,count*1./length)

def reducer(key,values,nb_documents):
    result=[]
    idf=math.log(nb_documents*1./len(values))
    for (document, count) in values:
        count*=idf
        result.append((document,count))
    return key, result

In [9]:
spark = SparkSession.getActiveSession()
sc = spark.sparkContext


input_csv = "weather_data.csv"
output_txt = "output_weather_data.txt"
csv_parser(input_csv,output_txt)
rdd = sc.textFile(output_txt)
rdd.map(lambda x: x.split("\t"))
length = rdd.count()
new_column_values = sc.parallelize(range(length)).collect()

rdd_id = rdd.map(lambda x: x.split("\t"))\
        .zipWithIndex().map(lambda x: ( new_column_values[x[1]],x[0]))

rdd_result =rdd_id.flatMap(lambda l: list(mapper(l[0],l[1])))\
        .groupByKey() \
        .map(lambda l: reducer(l[0],list(l[1]),length))

rdd_result.saveAsTextFile("Output_Weather_News_TF_IDF")    

In [11]:
rdd_id.toDF().show()

+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  0|[Burlington, VT,"...|
|  1|[Binghamton, NY,"...|
|  2|[Charleston, WV,"...|
|  3|[Albany, NY,"Brie...|
|  4|[Nashville, TN,"D...|
|  5|[Shreveport, LA,"...|
|  6|[Great Falls, MT,...|
|  7|[Pocatello, ID,"J...|
|  8|[NWS Phoenix,"***...|
|  9|[Gray - Portland,...|
| 10|[Philadelphia/Mt ...|
| 11|[NWS Wilmington, ...|
| 12|[Charleston, SC,"...|
| 13|[Newport/Morehead...|
+---+--------------------+



In [12]:
rdd_result.collect()

[('review', [(0, 0.06543117346153533)]),
 ('juli',
  [(0, 0.017185467286610214),
   (1, 0.038615441813924525),
   (3, 0.004590378679204935),
   (4, 0.004190732651511157),
   (7, 0.015403270679109895),
   (10, 0.07701635339554948),
   (13, 0.006943043544840854)]),
 ('21',
  [(0, 0.017185467286610214),
   (1, 0.003861544181392453),
   (3, 0.004590378679204935),
   (4, 0.002514439590906694),
   (5, 0.0033812057588290013),
   (11, 0.018733707582701226),
   (13, 0.002314347848280285)]),
 ('event',
  [(0, 0.0828273037021731),
   (1, 0.006979180882982552),
   (7, 0.08351753123302454),
   (13, 0.05019417569931358)]),
 ('august',
  [(0, 0.011456978191073476),
   (1, 0.021238492997658488),
   (3, 0.004590378679204935),
   (4, 0.00586702571211562),
   (5, 0.0033812057588290013),
   (7, 0.015403270679109895),
   (13, 0.01504326101382185)]),
 ('2020',
  [(0, 0.005728489095536738),
   (1, 0.003861544181392453),
   (4, 0.007543318772720082),
   (5, 0.010143617276487004),
   (6, 0.046209812037329684),

In [23]:
with open('Output_Weather_News_TF_IDF/part-00000', 'r') as file:
    file_contents_0 = file.read()

print(file_contents_0)

('review', [(0, 0.06543117346153533)])
('juli', [(0, 0.017185467286610214), (1, 0.038615441813924525), (3, 0.004590378679204935), (4, 0.004190732651511157), (7, 0.015403270679109895), (10, 0.07701635339554948), (13, 0.006943043544840854)])
('21', [(0, 0.017185467286610214), (1, 0.003861544181392453), (3, 0.004590378679204935), (4, 0.002514439590906694), (5, 0.0033812057588290013), (11, 0.018733707582701226), (13, 0.002314347848280285)])
('event', [(0, 0.0828273037021731), (1, 0.006979180882982552), (7, 0.08351753123302454), (13, 0.05019417569931358)])
('august', [(0, 0.011456978191073476), (1, 0.021238492997658488), (3, 0.004590378679204935), (4, 0.00586702571211562), (5, 0.0033812057588290013), (7, 0.015403270679109895), (13, 0.01504326101382185)])
('2020', [(0, 0.005728489095536738), (1, 0.003861544181392453), (4, 0.007543318772720082), (5, 0.010143617276487004), (6, 0.046209812037329684), (7, 0.046209812037329684), (13, 0.002314347848280285)])
('first', [(0, 0.021810391153845112)])


In [24]:
with open('Output_Weather_News_TF_IDF/part-00001', 'r') as file:
    file_contents_1 = file.read()

print(file_contents_1)

('burlington', [(0, 0.021810391153845112)])
('vt', [(0, 0.06543117346153533)])
('2022', [(0, 0.007002461656092592), (1, 0.016521128196964975), (2, 0.05295611627420023), (4, 0.0071718077662762105), (5, 0.012399480883715177), (7, 0.07531536536775144)])
('widespread', [(0, 0.016081902058308374), (5, 0.006328163086358743)])
('sever', [(0, 0.05956475967163725), (1, 0.080304578498809), (4, 0.039840170918738885), (5, 0.010045067484694225), (13, 0.015470074715576668)])
('vermont', [(0, 0.043620782307690224)])
('2', [(0, 0.005728489095536738), (1, 0.005792316272088679), (3, 0.027542272075229614), (4, 0.005028879181813388), (5, 0.006762411517658003), (7, 0.015403270679109895), (13, 0.003471521772420427)])
('tornado', [(0, 0.004624923867234898), (1, 0.0077940917539752455), (2, 0.034975986745963916), (3, 0.0037060648207643885), (4, 0.05819462849147079), (5, 0.06187621720261584), (11, 0.015124751025281694), (13, 0.0214877514566189)])
('warn', [(0, 0.021810391153845112)])
('30', [(0, 0.0046249238672

In [32]:
def mapper_count(key,value):

    stop_words=set()
    with open("stop_words.txt") as f:
        for line in f:
            stop_words.add(line.rstrip('\r\n'))

    value=value[0]
    it = re.finditer(r"\w+",value,re.UNICODE)
    words=dict()
    stemmer = Stemmer.Stemmer('english')

    for match in it:
        token=match.group().lower()

        if not(token in stop_words):
            token=stemmer.stemWord(token)
            if token in words:
                words[token]+=1
            else:
                words[token]=1
    for word, count in words.items():
        yield word, (key,count)

def reducer_count(key,values,nb_documents):
    result=[]
    for (document, count) in values:
        result.append((document,count))
    return key, result

In [27]:
rdd_result_count =rdd_id.flatMap(lambda l: list(mapper_count(l[0],l[1])))

In [33]:
rdd_result_count.groupByKey().map(lambda l: reducer_count(l[0],list(l[1]),length)).collect()

[('review', [(0, 3)]),
 ('juli', [(0, 3), (1, 20), (3, 1), (4, 5), (7, 1), (10, 1), (13, 6)]),
 ('21', [(0, 3), (1, 2), (3, 1), (4, 3), (5, 3), (11, 1), (13, 2)]),
 ('event', [(0, 8), (1, 2), (7, 3), (13, 24)]),
 ('august', [(0, 2), (1, 11), (3, 1), (4, 7), (5, 3), (7, 1), (13, 13)]),
 ('2020', [(0, 1), (1, 2), (4, 9), (5, 9), (6, 1), (7, 3), (13, 2)]),
 ('first', [(0, 1)]),
 ('6', [(0, 1), (4, 2), (7, 1), (13, 1)]),
 ('year', [(0, 1)]),
 ('2019', [(0, 1), (1, 4), (4, 6), (5, 8), (13, 3)]),
 ('weather', [(0, 4), (1, 7), (4, 5), (5, 7), (13, 6)]),
 ('signific', [(0, 1)]),
 ('wind', [(0, 4), (1, 1), (4, 11), (5, 5), (6, 1), (7, 2), (13, 4)]),
 ('76', [(0, 1)]),
 ('north', [(0, 1), (3, 1), (5, 6)]),
 ('18', [(0, 3), (1, 2), (3, 1), (4, 5), (5, 2), (13, 5)]),
 ('2018', [(0, 2), (1, 5), (3, 2), (4, 3), (5, 5), (6, 1), (7, 1), (13, 2)]),
 ('microburst', [(0, 2), (4, 2), (7, 1), (10, 1)]),
 ('high', [(0, 2), (1, 1), (4, 2), (13, 1)]),
 ('4', [(0, 1), (1, 1), (3, 1), (4, 3), (5, 2), (13, 1)]),

In [34]:
rdd_result_count.groupByKey().map(lambda l: reducer_count(l[0],list(l[1]),length)).saveAsTextFile("Inverted_Index")   

In [35]:
with open('Inverted_Index/part-00000', 'r') as file:
    file_contents_0_II = file.read()
with open('Inverted_Index/part-00001', 'r') as file:
    file_contents_1_II = file.read()

print(file_contents_0_II)
print(file_contents_1_II)

('review', [(0, 3)])
('juli', [(0, 3), (1, 20), (3, 1), (4, 5), (7, 1), (10, 1), (13, 6)])
('21', [(0, 3), (1, 2), (3, 1), (4, 3), (5, 3), (11, 1), (13, 2)])
('event', [(0, 8), (1, 2), (7, 3), (13, 24)])
('august', [(0, 2), (1, 11), (3, 1), (4, 7), (5, 3), (7, 1), (13, 13)])
('2020', [(0, 1), (1, 2), (4, 9), (5, 9), (6, 1), (7, 3), (13, 2)])
('first', [(0, 1)])
('6', [(0, 1), (4, 2), (7, 1), (13, 1)])
('year', [(0, 1)])
('2019', [(0, 1), (1, 4), (4, 6), (5, 8), (13, 3)])
('weather', [(0, 4), (1, 7), (4, 5), (5, 7), (13, 6)])
('signific', [(0, 1)])
('wind', [(0, 4), (1, 1), (4, 11), (5, 5), (6, 1), (7, 2), (13, 4)])
('76', [(0, 1)])
('north', [(0, 1), (3, 1), (5, 6)])
('18', [(0, 3), (1, 2), (3, 1), (4, 5), (5, 2), (13, 5)])
('2018', [(0, 2), (1, 5), (3, 2), (4, 3), (5, 5), (6, 1), (7, 1), (13, 2)])
('microburst', [(0, 2), (4, 2), (7, 1), (10, 1)])
('high', [(0, 2), (1, 1), (4, 2), (13, 1)])
('4', [(0, 1), (1, 1), (3, 1), (4, 3), (5, 2), (13, 1)])
('2017', [(0, 2), (1, 9), (3, 1), (4, 1