# **AIM**

## A Python / Java / Spark code which enables:

*   ### The given CSV data to be written, using the distributed storage layout strategy described, to reduce duplicate data
*   ### Retrieval of any record given the record ID from the distributed storage



# **Setup Apache Spark**

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://www-eu.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

In [None]:
!tar xf spark-3.0.3-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

In [None]:
findspark.find()

'/content/spark-3.0.3-bin-hadoop2.7'

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
spark

In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

--2022-03-02 19:08:37--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 18.205.222.128, 54.161.241.46, 54.237.133.81, ...
Connecting to bin.equinox.io (bin.equinox.io)|18.205.222.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.1’


2022-03-02 19:08:38 (18.5 MB/s) - ‘ngrok-stable-linux-amd64.zip.1’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: ngrok                   
{"tunnels":[{"name":"command_line (http)","uri":"/api/tunnels/command_line%20%28http%29","public_url":"http://1131-34-125-8-227.ngrok.io","proto":"http","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p

# **Setup Metadata**

In [None]:
# Setup metadata to reduce duplication
from pyspark.sql import Window

window = Window.orderBy(['University', 'Difficulty Level', 'Course Rating'])
data = data.withColumn('recordID',dense_rank().over(window) - 1)
data.orderBy(['University', 'Difficulty Level', 'Course Rating', 'recordID']).show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+
|         Course Name|          University|    Difficulty Level|       Course Rating|          Course URL|  Course Description|              Skills|recordID|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+
|  If you have com...| signed up to the...| met all the asse...| this course is f...| but this course ...| through the Conn...| US district lead...|       0|
|The Business of P...|Advancing Women i...|            Advanced|                  48|https://www.cours...|Sponsored by AMAZ...|market segmentati...|       1|
|Acing the Product...|Advancing Women i...|            Advanced|                   5|https://www.cours...|Sponsored by AMAZ...|process managemen...|       2|
|The Art  Science ...|Advancing Women i...|         

In [None]:
metadata = data.select("recordID").drop_duplicates()
metadata.show()

+--------+
|recordID|
+--------+
|       0|
|       1|
|       2|
|       3|
|       4|
|       5|
|       6|
|       7|
|       8|
|       9|
|      10|
|      11|
|      12|
|      13|
|      14|
|      15|
|      16|
|      17|
|      18|
|      19|
+--------+
only showing top 20 rows



# **Retrieve any record given the record ID**

In [None]:
# Get an ID from metadata
record_ID = list(map(lambda row: row[0], metadata.rdd.takeSample(False, 1)))[0]
record_ID

2404

In [None]:
# Retrieve records belonging to ID
result_df = data.filter(data.recordID == record_ID)
result_df.show()

+--------------------+--------------------+----------------+-------------+--------------------+--------------------+--------------------+--------+
|         Course Name|          University|Difficulty Level|Course Rating|          Course URL|  Course Description|              Skills|recordID|
+--------------------+--------------------+----------------+-------------+--------------------+--------------------+--------------------+--------+
|Lesson  Business ...|University of Was...|    Intermediate|           42|https://www.cours...|This lesson is pa...|english language ...|    1541|
+--------------------+--------------------+----------------+-------------+--------------------+--------------------+--------------------+--------+



In [None]:
# Store result
result_df.toPandas().to_csv("/content/drive/My Drive/spark_preprocessed_data.csv")