# RDD / DataFrame Joins - Lab 5

Reference/API Links


*   [Apache Spark Quick Start](https://spark.apache.org/docs/3.3.0/quick-start.html)
*   [PySpark v3.3.0 API](https://spark.apache.org/docs/3.3.0/api/python/reference/index.html)
*    [RDD Programming Guide](https://spark.apache.org/docs/3.3.0/rdd-programming-guide.html)
*    [Spark SQL Programming Guide](https://spark.apache.org/docs/3.3.0/sql-programming-guide.html)









In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m12.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=a95ee4de6e347a48779d77672a3ae26feccb098189eff5a661d81d113127cfdf
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

# Imports




In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import types as sparktypes
from pyspark.sql.functions import col

sc = SparkContext()
spark = SparkSession(sc)

In [None]:
# download sample access log / hostnames_country.csv for use in code below
!rm -f apache.access.log
!wget -q https://raw.githubusercontent.com/databricks/reference-apps/master/logs_analyzer/data/apache.access.log

!rm -f hostname_country.csv
!wget -q https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?rtime=gKcu8Dc82kg\&download=1 -O hostname_country.csv

# Apache HTTP Log - Resilient Distributed Dataset (RDD)

A SparkContext instance can be used to create RDDs from various data/files/resources (text files, CSV, Hadoop data files, etc.)

In [None]:
# find the top 10 clients, map/reduce style using RDD transformations
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[0], 1 ))  # field 0 = client address
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda t: -t[1]))

print ("Total count of client hostnames:")
print(access_log_rdd.count())

print ("Top 10 client hostnames:")
print(access_log_rdd.take(10))


Total count of client hostnames:
169
Top 10 client hostnames:
[('64.242.88.10', 452), ('10.0.0.153', 188), ('cr020r01-3.sac.overture.com', 44), ('h24-71-236-129.ca.shawcable.net', 36), ('h24-70-69-74.ca.shawcable.net', 32), ('market-mail.panduit.com', 29), ('ts04-ip92.hevanet.com', 28), ('ip68-228-43-49.tc.ph.cox.net', 22), ('proxy0.haifa.ac.il', 19), ('207.195.59.160', 15)]


# Apache HTTP Log - DataFrame

A DataFrame is equivalent to a relational table in Spark SQL, and can be created from on a variety of input formats (CSV, JSON, relational database, etc.) using the SparkSession.

In [None]:
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")

named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))



named_df.show(truncate=False)
named_df.printSchema()

+------------+---------------------+-------------------------------------------------------------------------------------------------+------+------------+
|host        |timestamp            |path                                                                                             |status|content_size|
+------------+---------------------+-------------------------------------------------------------------------------------------------+------+------------+
|64.242.88.10|[07/Mar/2004:16:05:49|GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1   |401   |12846       |
|64.242.88.10|[07/Mar/2004:16:06:51|GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1                            |200   |4523        |
|64.242.88.10|[07/Mar/2004:16:10:02|GET /mailman/listinfo/hsdivision HTTP/1.1                                                        |200   |6291        |
|64.242.88.10|[07/Mar/2004:16:11:58|GET /twiki/bin/view/TWiki/WikiSynt

# Part 1: Reporting Tasks from Prior Lab

1.   Given an Apache HTTP access log, along with a CSV file that includes country information for hostnames (sample file: [hostname_country.csv](https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?e=xTmRdh)), produce a report that shows the total request count for each country, sorted by request count (highest to lowest)
2.   Using the same two input files (access.log and [hostname_country.csv](https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?e=xTmRdh)), produce a report that lists, for each country, the count of each URL visited. Sort by country (alphabetically, A-Z) then count (from highest to lowest), For example:

```
  Argentina  /home   115
  Argentina /another/page.html  105
  ...
  United States  /robots.txt   185
  United States /another/page.html  120
  Uruguay  /home  310
  Uruguay  /another/page.html  120
```







# (A) RDD Implementations

Perform reporting tasks 1 and 2 using RDD transformations

[RDD APIs PySpark v3.3.0](https://spark.apache.org/docs/3.3.0/api/python/reference/pyspark.html#rdd-apis)

In [None]:
# RDD implementation
# (1) Given an Apache HTTP access log, along with a CSV file that includes country information for
# hostnames (sample file: [hostname_country.csv](https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?e=xTmRdh)),
# produce a report that shows the total request count for each country, sorted by request count (highest to lowest)
host_names_rdd = (sc.textFile("hostname_country.csv")
                  .map(lambda line: ( line.split(",")[0], line.split(",")[1] )))
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[0], 1 ))  # field 0 = client address
                  .join(host_names_rdd)
                  .map(lambda x: (x[1][1], x[1][0]))
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda t: -t[1]))

print(access_log_rdd.take(10))

[('Unknown Location', 812), ('Intranet', 198), ('Canada', 127), ('United States', 87), ('Robot', 80), ('Israel', 19), ('Australia', 17), ('France', 16), ('Wherever You Want to Ship', 13), ('Germany', 13)]


In [None]:
# RDD implementation
# (2) Using the same two input files (access.log and hostname_country.csv), produce a report that lists,
# for each country, the count of each URL visited. Sort by country (alphabetically, A-Z) then count (from highest to lowest)
host_names_rdd = (sc.textFile("hostname_country.csv")
                  .map(lambda line: ( line.split(",")[0], line.split(",")[1] )))
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[0], (line.split(" ")[6], 1)))
                  .join(host_names_rdd)
                  .map(lambda x: ((x[1][1], x[1][0][0]), x[1][0][1]))
                  .reduceByKey(lambda x, y: x + y)
                  .sortBy(lambda x: (x[0][0], -x[1])))

print(access_log_rdd.take(10))

[(('Australia', '/twiki/bin/view/Main/SpamAssassinDeleting'), 2), (('Australia', '/mailman/listinfo'), 2), (('Australia', '/twiki/bin/view/Main/SpamAssassinAndPostFix'), 2), (('Australia', '/twiki/bin/view/Main/SpamAssassinTaggingOnly'), 1), (('Australia', '/icons/mailman.jpg'), 1), (('Australia', '/mailman/admin/webct'), 1), (('Australia', '/mailman/admin'), 1), (('Australia', '/icons/gnu-head-tiny.jpg'), 1), (('Australia', '/mailman'), 1), (('Australia', '/twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif'), 1)]


# (B) DataFrame Implementations

Perform reporting tasks 1 and 2 using Spark's DataFrame API. Note that you should *not* use the Spark SQL abstraction.

In [None]:
from pyspark.sql.functions import to_timestamp

access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log", quote="")
log_named_df = access_log_df.select(col('_c0').alias('host'),
                                to_timestamp(col('_c3').substr(2,50), "dd/MMM/yyyy:HH:mm:SS").alias('timestamp'),
                                col('_c6').alias('path'),
                                col('_c8').cast('integer').alias('status'),
                                col('_c9').cast('integer').alias('content_size'))

host_names_rdd = spark.read.options(delimiter=",").csv("hostname_country.csv", quote="")
host_named_df = host_names_rdd.select(col('_c0').alias('host'),
                                col('_c1').alias('country'))

log_named_df.show(truncate=False)
host_named_df.show(truncate=False)

+------------+----------------------+------------------------------------------------------------------------------------+------+------------+
|host        |timestamp             |path                                                                                |status|content_size|
+------------+----------------------+------------------------------------------------------------------------------------+------+------------+
|64.242.88.10|2004-03-07 16:05:00.49|/twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables   |401   |12846       |
|64.242.88.10|2004-03-07 16:06:00.51|/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2                            |200   |4523        |
|64.242.88.10|2004-03-07 16:10:00.02|/mailman/listinfo/hsdivision                                                        |200   |6291        |
|64.242.88.10|2004-03-07 16:11:00.58|/twiki/bin/view/TWiki/WikiSyntax                                                    |200   |7352        |

In [None]:
# DataFrame implementation
# (1) Given an Apache HTTP access log, along with a CSV file that includes country information for
# hostnames (sample file: [hostname_country.csv](https://cpslo-my.sharepoint.com/:x:/g/personal/amigler_calpoly_edu/EcJv6-ZtZR5PpTEcKuDcYmsBq38YIwrNL6d6JtVztjrkhA?e=xTmRdh)),
# produce a report that shows the total request count for each country, sorted by request count (highest to lowest)
cc_df = (log_named_df
         .groupBy('host')
         .count()
         .join(host_named_df, log_named_df.host == host_named_df.host)
         .select('country', 'count')
         .orderBy('count', ascending = False))

cc_df.show(truncate=False)

+-------------------------+-----+
|country                  |count|
+-------------------------+-----+
|Unknown Location         |452  |
|Intranet                 |188  |
|Unknown Location         |44   |
|Canada                   |36   |
|Canada                   |32   |
|Unknown Location         |29   |
|Unknown Location         |28   |
|Unknown Location         |22   |
|Israel                   |19   |
|Unknown Location         |15   |
|Unknown Location         |14   |
|France                   |14   |
|United States            |14   |
|United States            |13   |
|Unknown Location         |13   |
|Unknown Location         |13   |
|Unknown Location         |13   |
|Wherever You Want to Ship|13   |
|United States            |12   |
|Unknown Location         |12   |
+-------------------------+-----+
only showing top 20 rows



In [None]:
# DataFrame implementation
# (2) Using the same two input files (access.log and hostname_country.csv), produce a report that lists,
# for each country, the count of each URL visited. Sort by country (alphabetically, A-Z) then count (from highest to lowest)
cc_df = (log_named_df
         .groupBy('host', 'path')
         .count()
         .join(host_named_df, log_named_df.host == host_named_df.host)
         .select('country', 'path', 'count')
         .orderBy(['country', 'count'], ascending = [True, False]))

cc_df.show(truncate=False)

+---------+-----------------------------------------------+-----+
|country  |path                                           |count|
+---------+-----------------------------------------------+-----+
|Australia|/twiki/bin/view/Main/SpamAssassinAndPostFix    |2    |
|Australia|/mailman/listinfo                              |2    |
|Australia|/twiki/bin/view/Main/SpamAssassinDeleting      |2    |
|Australia|/icons/PythonPowered.png                       |1    |
|Australia|/icons/mailman.jpg                             |1    |
|Australia|/twiki/bin/view/Main/SpamAssassinTaggingOnly   |1    |
|Australia|/mailman/listinfo/administration               |1    |
|Australia|/mailman                                       |1    |
|Australia|/icons/gnu-head-tiny.jpg                       |1    |
|Australia|/mailman/admin/webct                           |1    |
|Australia|/twiki/bin/view/Main/WebHome                   |1    |
|Australia|/mailman/admin                                 |1    |
|Australia

# Part 2: Reporting Tasks Based on Student / Course Data



1.   Find the names of the students that have taken at least one of the courses with the greatest difficulty.
2.   Find the average course difficulty of the classes that are taken by each student. Print average course difficulty of 0 if a student hasn't taken any classes (hint, use left outer or right outer join).



In [None]:
# Data for exercises below

# course information: course code, difficulty (1-3)
courses = [("CSC365", 1),
           ("CSC369", 1),
           ("CSC430", 3),
           ("CSC469", 2)]
courses_df = spark.createDataFrame(courses, ["Course", "Difficulty"])
courses_rdd = courses_df.rdd.map(tuple)

# student information: unique student ID, name, email
students = [(1,"Nikita","Trevett","ntrevett0@abc.net.au"),
            (2,"Jacquenette","Tolson","jtolson1@ucoz.com"),
            (3,"Helsa","St Ledger","hstledger2@elpais.com"),
            (4,"Eli","Golland","egolland3@wordpress.com"),
            (5,"Amitie","Mytton","amytton4@mit.edu"),
            (6,"Adan","Holtum","aholtum1i@amazon.co.uk")]
students_df = spark.createDataFrame(students, ["ID", "FirstName", "LastName", "Email"])
students_rdd = students_df.rdd.map(tuple)


# student grades: student ID, course code, grade earned
student_grades = [(1, "CSC365", "A"),
                  (1, "CSC369", "A-"),
                  (1, "CSC469", "B"),
                  (2, "CSC369", "B"),
                  (3, "CSC365", "A"),
                  (3, "CSC430", "B")]
student_grades_df = spark.createDataFrame(student_grades, ["StudentId", "Course", "Grade"])
student_grades_rdd = student_grades_df.rdd.map(tuple)

courses_df.show(truncate=False)
students_df.show(truncate=False)
student_grades_df.show(truncate=False)

+------+----------+
|Course|Difficulty|
+------+----------+
|CSC365|1         |
|CSC369|1         |
|CSC430|3         |
|CSC469|2         |
+------+----------+

+---+-----------+---------+-----------------------+
|ID |FirstName  |LastName |Email                  |
+---+-----------+---------+-----------------------+
|1  |Nikita     |Trevett  |ntrevett0@abc.net.au   |
|2  |Jacquenette|Tolson   |jtolson1@ucoz.com      |
|3  |Helsa      |St Ledger|hstledger2@elpais.com  |
|4  |Eli        |Golland  |egolland3@wordpress.com|
|5  |Amitie     |Mytton   |amytton4@mit.edu       |
|6  |Adan       |Holtum   |aholtum1i@amazon.co.uk |
+---+-----------+---------+-----------------------+

+---------+------+-----+
|StudentId|Course|Grade|
+---------+------+-----+
|1        |CSC365|A    |
|1        |CSC369|A-   |
|1        |CSC469|B    |
|2        |CSC369|B    |
|3        |CSC365|A    |
|3        |CSC430|B    |
+---------+------+-----+



In [None]:
s = students_df.withColumnRenamed("ID", "StudentId")
s.join(student_grades_df, "StudentId").show()

+---------+-----------+---------+--------------------+------+-----+
|StudentId|  FirstName| LastName|               Email|Course|Grade|
+---------+-----------+---------+--------------------+------+-----+
|        1|     Nikita|  Trevett|ntrevett0@abc.net.au|CSC365|    A|
|        1|     Nikita|  Trevett|ntrevett0@abc.net.au|CSC369|   A-|
|        1|     Nikita|  Trevett|ntrevett0@abc.net.au|CSC469|    B|
|        2|Jacquenette|   Tolson|   jtolson1@ucoz.com|CSC369|    B|
|        3|      Helsa|St Ledger|hstledger2@elpais...|CSC365|    A|
|        3|      Helsa|St Ledger|hstledger2@elpais...|CSC430|    B|
+---------+-----------+---------+--------------------+------+-----+



# (A) RDD Implementations

In [None]:
# RDD implementation
# (1) Find the names of the students that have taken at least one of the courses with the greatest difficulty.
rdd1 = (students_rdd
        .map(lambda x: (x[0], (x[1], x[2])))
        .join(student_grades_rdd)
        .map(lambda x: (x[1][1], x[1][0]))
        .join(courses_rdd)
        .map(lambda x: (x[1][1], x[1][0])))

print(rdd1.max())

(3, ('Helsa', 'St Ledger'))


In [None]:
# RDD implementation
# (2) Find the average course difficulty of the classes that are taken by each student. Print average course difficulty of 0
#     if a student hasn't taken any classes (hint, use left outer or right outer join).
rdd2 = (students_rdd
        .map(lambda x: (x[0], (x[1], x[2])))
        .leftOuterJoin(student_grades_rdd)
        .map(lambda x: (x[1][1], x[1][0]))
        .leftOuterJoin(courses_rdd)
        .map(lambda x: (x[1][0], x[1][1] if x[1][1] is not None else 0))
        .groupByKey()
        .mapValues(lambda x: sum(list(x)) / len(list(x))))

print(rdd2.collect())

[(('Nikita', 'Trevett'), 1.3333333333333333), (('Eli', 'Golland'), 0.0), (('Jacquenette', 'Tolson'), 1.0), (('Amitie', 'Mytton'), 0.0), (('Helsa', 'St Ledger'), 2.0), (('Adan', 'Holtum'), 0.0)]


# (B) DataFrame Implementations

Spark SQL is not permitted for these exercises.

In [None]:
# DataFrame implementation
# (1) Find the names of the students that have taken at least one of the courses with the greatest difficulty.
from pyspark.sql.functions import max
max_df = (s.join(student_grades_df, "StudentId")
       .join(courses_df, "Course")
       .agg(max("Difficulty")))

s_df = (s.join(student_grades_df, "StudentId")
       .join(courses_df, "Course")
       .groupBy("FirstName", "LastName")
       .agg(max("Difficulty")))

df1 = (max_df.join(s_df, "max(Difficulty)")
       .select("FirstName", "LastName"))

df1.show()

+---------+---------+
|FirstName| LastName|
+---------+---------+
|    Helsa|St Ledger|
+---------+---------+



In [None]:
# DataFrame implementation
# (2) Find the average course difficulty of the classes that are taken by each student. Print average course difficulty of 0
#     if a student hasn't taken any classes (hint, use left outer or right outer join).
df2 = (students_df
       .join(student_grades_df, students_df["ID"] == student_grades_df["StudentId"], "left")
       .join(courses_df, "Course", "left")
       .fillna(0)
       .select("FirstName", "LastName", "Difficulty")
       .groupBy("FirstName", "LastName")
       .agg({"Difficulty": "mean"}))

df2.show()

+-----------+---------+------------------+
|  FirstName| LastName|   avg(Difficulty)|
+-----------+---------+------------------+
|     Amitie|   Mytton|               0.0|
|      Helsa|St Ledger|               2.0|
|       Adan|   Holtum|               0.0|
|     Nikita|  Trevett|1.3333333333333333|
|Jacquenette|   Tolson|               1.0|
|        Eli|  Golland|               0.0|
+-----------+---------+------------------+

