In [1]:
import os
import pyspark
conf = pyspark.SparkConf()

conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4041')
conf.set('spark.sql.repl.eagerEval.enabled', True)
conf.set('spark.driver.memory','4g')
sc = pyspark.SparkContext(conf=conf)

spark = pyspark.SQLContext.getOrCreate(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/24 21:52:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable




In [2]:
import warnings
warnings.filterwarnings('ignore')

**Question 2. Spark – Language Models**

In [3]:
romeo_juliet = spark.read\
.text("shared/hw2/romeo-juliet-pg1777.txt")

In [4]:
romeo_juliet.show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+
|               value|
+--------------------+
|                    |
|This Etext file i...|
|cooperation with ...|
|Future and Shakes...|
|Etexts that are N...|
|                    |
|*This Etext has c...|
|                    |
|<<THIS ELECTRONIC...|
|SHAKESPEARE IS CO...|
|PROVIDED BY PROJE...|
|MACHINE READABLE ...|
|(1) ARE FOR YOUR ...|
|DISTRIBUTED OR US...|
|DISTRIBUTION INCL...|
|TIME OR FOR MEMBE...|
|                    |
|*Project Gutenber...|
|in the presentati...|
|for your reading ...|
+--------------------+
only showing top 20 rows



                                                                                

In [5]:
from pyspark.sql.functions import *
romeo_juliet = romeo_juliet.select(trim(lower(regexp_replace(col('value'),'[^\sa-zA-Z0-9]',''))).alias('new_line'))

In [6]:
from pyspark.sql import functions as F

In [7]:
romeo_juliet_unigrams = romeo_juliet.withColumn('words',F.explode(F.split(col('new_line'),' '))).groupBy('words').count().sort('count',ascending=False)

In [8]:
unigrams = romeo_juliet_unigrams.where(col('words')!='')

***Unigrams***

In [9]:
unigrams.show()

[Stage 1:>                                                          (0 + 1) / 1]

+-----+-----+
|words|count|
+-----+-----+
|  and|  773|
|  the|  748|
|    i|  585|
|   to|  580|
|    a|  483|
|   of|  474|
|   is|  383|
| that|  365|
|   my|  360|
|   in|  328|
|  you|  323|
| thou|  277|
|  not|  275|
| with|  268|
|  for|  267|
|   me|  265|
| this|  258|
|   it|  237|
|   be|  234|
|  but|  186|
+-----+-----+
only showing top 20 rows



                                                                                

In [10]:
romeo_juliet_bigrams = romeo_juliet.withColumn("value",split(col("new_line")," "))

In [11]:
romeo_juliet_bigrams.show()

+--------------------+--------------------+
|            new_line|               value|
+--------------------+--------------------+
|                    |                  []|
|this etext file i...|[this, etext, fil...|
|cooperation with ...|[cooperation, wit...|
|future and shakes...|[future, and, sha...|
|etexts that are n...|[etexts, that, ar...|
|                    |                  []|
|this etext has ce...|[this, etext, has...|
|                    |                  []|
|this electronic v...|[this, electronic...|
|shakespeare is co...|[shakespeare, is,...|
|provided by proje...|[provided, by, pr...|
|machine readable ...|[machine, readabl...|
|1 are for your or...|[1, are, for, you...|
|distributed or us...|[distributed, or,...|
|distribution incl...|[distribution, in...|
|time or for membe...|[time, or, for, m...|
|                    |                  []|
|project gutenberg...|[project, gutenbe...|
|in the presentati...|[in, the, present...|
|for your reading ...|[for, your

In [12]:
from pyspark.ml.feature import NGram

In [13]:
romeo_juliet_bigrams = NGram(n=2,inputCol="value",outputCol="bigrams").transform(romeo_juliet_bigrams)
romeo_juliet_bigrams.show()

+--------------------+--------------------+--------------------+
|            new_line|               value|             bigrams|
+--------------------+--------------------+--------------------+
|                    |                  []|                  []|
|this etext file i...|[this, etext, fil...|[this etext, etex...|
|cooperation with ...|[cooperation, wit...|[cooperation with...|
|future and shakes...|[future, and, sha...|[future and, and ...|
|etexts that are n...|[etexts, that, ar...|[etexts that, tha...|
|                    |                  []|                  []|
|this etext has ce...|[this, etext, has...|[this etext, etex...|
|                    |                  []|                  []|
|this electronic v...|[this, electronic...|[this electronic,...|
|shakespeare is co...|[shakespeare, is,...|[shakespeare is, ...|
|provided by proje...|[provided, by, pr...|[provided by, by ...|
|machine readable ...|[machine, readabl...|[machine readable...|
|1 are for your or...|[1,

In [14]:
bigrams = romeo_juliet_bigrams.select(explode("bigrams").alias("bigram")).groupBy("bigram").count().sort('count',ascending=False)

In [15]:
bigram = bigrams.where(col('bigram')!=' ')

***Bigrams***

In [16]:
bigram.show()

[Stage 6:>                                                          (0 + 1) / 1]

+--------+-----+
|  bigram|count|
+--------+-----+
|  of the|   63|
|  i will|   60|
|    i am|   52|
|  in the|   46|
|  to the|   39|
|  i have|   35|
|   it is|   35|
|  is the|   30|
|thou art|   27|
| for the|   24|
|  with a|   24|
|   to be|   24|
| in this|   24|
|  that i|   23|
|   is my|   23|
| give me|   22|
| of this|   22|
|   to my|   22|
|   of my|   22|
|art thou|   21|
+--------+-----+
only showing top 20 rows



                                                                                

In [17]:
bigram = bigram.withColumn("split_bigrams",split(col("bigram")," "))
bigram.show()

[Stage 9:>                                                          (0 + 1) / 1]

+--------+-----+-------------+
|  bigram|count|split_bigrams|
+--------+-----+-------------+
|  of the|   63|    [of, the]|
|  i will|   60|    [i, will]|
|    i am|   52|      [i, am]|
|  in the|   46|    [in, the]|
|  to the|   39|    [to, the]|
|  i have|   35|    [i, have]|
|   it is|   35|     [it, is]|
|  is the|   30|    [is, the]|
|thou art|   27|  [thou, art]|
| for the|   24|   [for, the]|
|  with a|   24|    [with, a]|
|   to be|   24|     [to, be]|
| in this|   24|   [in, this]|
|  that i|   23|    [that, i]|
|   is my|   23|     [is, my]|
| give me|   22|   [give, me]|
| of this|   22|   [of, this]|
|   to my|   22|     [to, my]|
|   of my|   22|     [of, my]|
|art thou|   21|  [art, thou]|
+--------+-----+-------------+
only showing top 20 rows



                                                                                

In [18]:
unigram = unigrams.withColumnRenamed("count","unigram_count")
bigram = bigram.withColumnRenamed("count","bigram_count")

In [19]:
result_df = bigram.join(unigram,bigram.split_bigrams[0] == unigram.words,"inner")
result_df.show()

                                                                                

+-----------------+------------+--------------------+------------+-------------+
|           bigram|bigram_count|       split_bigrams|       words|unigram_count|
+-----------------+------------+--------------------+------------+-------------+
|     personal use|           9|     [personal, use]|    personal|            9|
|        access to|           1|        [access, to]|      access|            2|
|   infringement a|           1|   [infringement, a]|infringement|            1|
|      daughter to|           1|      [daughter, to]|    daughter|           14|
|        while you|           1|        [while, you]|       while|            6|
| other servingmen|           1| [other, servingmen]|       other|           28|
|            ben i|           3|            [ben, i]|         ben|           64|
|       come madam|           1|       [come, madam]|        come|           98|
|        she agree|           1|        [she, agree]|         she|          114|
|    pleasure stay|         

In [20]:
bigram_format = F.udf(lambda x:x[1]+"|"+x[0])

In [21]:
result_df = result_df.withColumn("Bigram_format",bigram_format(col('split_bigrams')) )

In [22]:
result_df = result_df.withColumn("Probability",(F.col("bigram_count")/F.col("unigram_count")))

In [23]:
result_df.show()

[Stage 23:>                                                         (0 + 1) / 1]

+-----------------+------------+--------------------+------------+-------------+-----------------+--------------------+
|           bigram|bigram_count|       split_bigrams|       words|unigram_count|    Bigram_format|         Probability|
+-----------------+------------+--------------------+------------+-------------+-----------------+--------------------+
|     personal use|           9|     [personal, use]|    personal|            9|     use|personal|                 1.0|
|        access to|           1|        [access, to]|      access|            2|        to|access|                 0.5|
|   infringement a|           1|   [infringement, a]|infringement|            1|   a|infringement|                 1.0|
|      daughter to|           1|      [daughter, to]|    daughter|           14|      to|daughter| 0.07142857142857142|
|        while you|           1|        [while, you]|       while|            6|        you|while| 0.16666666666666666|
| other servingmen|           1| [other,

                                                                                

In [63]:
result_df.select(result_df.columns[5:7]).show(10)

+----------------+--------------------+
|   Bigram_format|         Probability|
+----------------+--------------------+
|    use|personal|                 1.0|
|       to|access|                 0.5|
|  a|infringement|                 1.0|
|     to|daughter| 0.07142857142857142|
|       you|while| 0.16666666666666666|
|servingmen|other| 0.03571428571428571|
|           i|ben|            0.046875|
|      madam|come| 0.01020408163265306|
|       agree|she|0.008771929824561403|
|   stay|pleasure|                 0.2|
+----------------+--------------------+
only showing top 10 rows



**Question 3. Spark Bloom Filters and Broadcast Joins**

In [25]:
Files_of_A_1 = ["shared/midterm/drive_stats_2019_Q1/"+"2019-01-0%s.csv"%(str(i)) if i<10 else "shared/midterm/drive_stats_2019_Q1/"+"2019-01-%s.csv"%(str(i)) for i in range(1,32)]

In [26]:
Files_of_A_1

['shared/midterm/drive_stats_2019_Q1/2019-01-01.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-02.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-03.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-04.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-05.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-06.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-07.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-08.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-09.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-10.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-11.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-12.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-13.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-14.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-15.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-16.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-17.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-18.csv',
 'shared/midterm/drive_stats

In [27]:
Files_of_A_2 = ["shared/midterm/drive_stats_2019_Q1/"+"2019-02-0%s.csv"%(str(i)) if i<10 else "shared/midterm/drive_stats_2019_Q1/"+"2019-02-%s.csv"%(str(i)) for i in range(1,29)]

In [28]:
Files_of_A_2

['shared/midterm/drive_stats_2019_Q1/2019-02-01.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-02.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-03.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-04.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-05.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-06.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-07.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-08.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-09.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-10.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-11.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-12.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-13.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-14.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-15.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-16.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-17.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-02-18.csv',
 'shared/midterm/drive_stats

In [29]:
Files_of_A_3 = ["shared/midterm/drive_stats_2019_Q1/"+"2019-03-0%s.csv"%(str(i)) if i<10 else "shared/midterm/drive_stats_2019_Q1/"+"2019-03-%s.csv"%(str(i)) for i in range(1,31)]

In [30]:
Files_of_A_3

['shared/midterm/drive_stats_2019_Q1/2019-03-01.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-02.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-03.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-04.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-05.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-06.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-07.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-08.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-09.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-10.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-11.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-12.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-13.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-14.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-15.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-16.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-17.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-03-18.csv',
 'shared/midterm/drive_stats

In [31]:
Files_of_B = "shared/midterm/drive_stats_2019_Q1/2019-03-31.csv"

In [32]:
Files_of_A = Files_of_A_1 + Files_of_A_2 + Files_of_A_3

In [33]:
Files_of_A

['shared/midterm/drive_stats_2019_Q1/2019-01-01.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-02.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-03.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-04.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-05.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-06.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-07.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-08.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-09.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-10.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-11.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-12.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-13.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-14.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-15.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-16.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-17.csv',
 'shared/midterm/drive_stats_2019_Q1/2019-01-18.csv',
 'shared/midterm/drive_stats

In [34]:
df_A = spark.read.load(Files_of_A,format="csv", inferSchema="true", header="true")

                                                                                

In [35]:
df_B = spark.read.load(Files_of_B,format="csv", inferSchema="true", header="true")

                                                                                

In [36]:
df_A.count()

                                                                                

9470808

In [37]:
List_model = [x[0] for x in df_B.select("model").distinct().collect()]

                                                                                

In [38]:
List_model

['ST4000DM000',
 'ST12000NM0007',
 'ST8000DM005',
 'TOSHIBA MQ01ABF050M',
 'ST8000NM0055',
 'Seagate BarraCuda SSD ZA2000CM10002',
 'TOSHIBA MG07ACA14TA',
 'WDC WD60EFRX',
 'ST8000DM002',
 'ST4000DM005',
 'DELLBOSS VD',
 'HGST HUS726040ALE610',
 'TOSHIBA HDWF180',
 'HGST HMS5C4040ALE640',
 'HGST HUH721010ALE600',
 'TOSHIBA MD04ABA500V',
 'TOSHIBA MD04ABA400V',
 'ST10000NM0086',
 'TOSHIBA MQ01ABF050',
 'ST500LM021',
 'Seagate BarraCuda SSD ZA500CM10002',
 'ST6000DM004',
 'ST6000DM001',
 'HGST HMS5C4040BLE640',
 'ST6000DX000',
 'WDC WD5000LPVX',
 'WDC WD5000BPKT',
 'TOSHIBA HDWE160',
 'ST500LM030',
 'ST500LM012 HN',
 'HGST HUH728080ALE600',
 'HGST HDS5C4040ALE630',
 'Hitachi HDS5C4040ALE630',
 'HGST HUH721212ALE600',
 'WDC WD5000LPCX',
 'ST8000DM004',
 'HGST HUH721212ALN604',
 'HGST HMS5C4040BLE641',
 'ST1000LM024 HN']

In [39]:
from bloom_filter2 import BloomFilter

In [40]:
#pip install bloom-filter2

In [41]:
print(len(List_model))

39


In [42]:
bloom = BloomFilter(max_elements=50, error_rate=0.001)

In [43]:
for i in List_model:
    bloom.add(i)

In [44]:
assert 'ST1000LM024 HN' in bloom

In [45]:
bloom_filter_broadcast = sc.broadcast(bloom)

In [46]:
def model_filter( bloom_filter_broadcast ):
    bloom_filter_broadcast_value = bloom_filter_broadcast.value
    def _filter(df_A):
        for x in df_A:
            if x['model'] in bloom_filter_broadcast_value:
                yield x
    return _filter

In [47]:
df_A.rdd.mapPartitions(model_filter(bloom_filter_broadcast)).count()

                                                                                

9470149

**Question 4. Ranking over Partitions - In Spark**


In [48]:
BreadBasket = spark\
.read.option("header",True)\
.option("inferSchema",True)\
.csv("shared/hw2/BreadBasket_DMS.csv")

In [49]:
from pyspark.sql.functions import col,hour

In [50]:
BreadBasket_new = BreadBasket.withColumn("Hour", hour(col("Time")))

In [51]:
BreadBasket_new

Date,Time,Transaction,Item,Hour
2016-10-30 00:00:00,2022-10-24 09:58:11,1,Bread,9
2016-10-30 00:00:00,2022-10-24 10:05:34,2,Scandinavian,10
2016-10-30 00:00:00,2022-10-24 10:05:34,2,Scandinavian,10
2016-10-30 00:00:00,2022-10-24 10:07:57,3,Hot chocolate,10
2016-10-30 00:00:00,2022-10-24 10:07:57,3,Jam,10
2016-10-30 00:00:00,2022-10-24 10:07:57,3,Cookies,10
2016-10-30 00:00:00,2022-10-24 10:08:41,4,Muffin,10
2016-10-30 00:00:00,2022-10-24 10:13:03,5,Coffee,10
2016-10-30 00:00:00,2022-10-24 10:13:03,5,Pastry,10
2016-10-30 00:00:00,2022-10-24 10:13:03,5,Bread,10


In [52]:
BreadBasket_new.registerTempTable("BreadBasket_new")

In [53]:
df_BreadBasket = spark.sql("select Item, Hour, count(*) as Total from BreadBasket_new where Item!='NONE' group by Item,Hour order by Hour")


In [54]:
df_BreadBasket.show(10)

+---------+----+-----+
|     Item|Hour|Total|
+---------+----+-----+
|    Bread|   1|    1|
|   Coffee|   7|   13|
|Medialuna|   7|    6|
|    Toast|   7|    1|
|    Bread|   7|    2|
|   Pastry|   7|    2|
|   Muesli|   8|    2|
|    Toast|   8|   23|
|Medialuna|   8|   43|
| Panatone|   8|    1|
+---------+----+-----+
only showing top 10 rows



In [55]:
df_BreadBasket.registerTempTable("HourCount")

In [56]:
df_BreadBasket = spark.sql('select Item,Hour,Total,dense_rank() over(partition by Hour order by Total) as Rank from HourCount')

In [57]:
df_BreadBasket_least_sold = df_BreadBasket.where('rank in (1,2)')

In [58]:
df_BreadBasket_least_sold.registerTempTable("Least_sold_ascending_order")

***Bottom 2 items sold every hour***

In [59]:
df_BreadBasket_least_sold.show()

+-----------------+----+-----+----+
|             Item|Hour|Total|Rank|
+-----------------+----+-----+----+
|            Bread|   1|    1|   1|
|            Toast|   7|    1|   1|
|            Bread|   7|    2|   2|
|           Pastry|   7|    2|   2|
|         Panatone|   8|    1|   1|
|Gingerbread syrup|   8|    1|   1|
|             Eggs|   8|    1|   1|
|           Tiffin|   8|    1|   1|
|   Dulce de Leche|   8|    1|   1|
|          Granola|   8|    1|   1|
|         Truffles|   8|    1|   1|
|        Nomad bag|   8|    1|   1|
|             Coke|   8|    1|   1|
|          Tartine|   8|    1|   1|
|   Vegan mincepie|   8|    1|   1|
|         Siblings|   8|    2|   2|
|           Muesli|   8|    2|   2|
|            Fudge|   8|    2|   2|
|         Mortimer|   8|    2|   2|
|         Bakewell|   8|    2|   2|
+-----------------+----+-----+----+
only showing top 20 rows



In [60]:
df_BreadBasket = spark.sql('select Item,Hour,Total,dense_rank() over(partition by Hour order by Total desc) as Rank from HourCount')

In [61]:
df_BreadBasket_most_sold = df_BreadBasket.where('rank in (1,2)')

***Top 2 items sold every hour***

In [62]:
df_BreadBasket_most_sold.show()

+---------+----+-----+----+
|     Item|Hour|Total|Rank|
+---------+----+-----+----+
|    Bread|   1|    1|   1|
|   Coffee|   7|   13|   1|
|Medialuna|   7|    6|   2|
|   Coffee|   8|  199|   1|
|    Bread|   8|  171|   2|
|   Coffee|   9|  583|   1|
|    Bread|   9|  400|   2|
|   Coffee|  10|  820|   1|
|    Bread|  10|  508|   2|
|   Coffee|  11|  946|   1|
|    Bread|  11|  528|   2|
|   Coffee|  12|  740|   1|
|    Bread|  12|  474|   2|
|   Coffee|  13|  607|   1|
|    Bread|  13|  340|   2|
|   Coffee|  14|  636|   1|
|    Bread|  14|  341|   2|
|   Coffee|  15|  519|   1|
|    Bread|  15|  310|   2|
|   Coffee|  16|  321|   1|
+---------+----+-----+----+
only showing top 20 rows

