# Map Reduce

<a id='installing-spark'></a>
### Installing Spark

Install Dependencies:


1.   Java 8
2.   Apache Spark with hadoop and
3.   Findspark (used to locate the spark in the system)


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

Set Environment Variables:

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
!ls

drive  sample_data  spark-3.1.1-bin-hadoop3.2  spark-3.1.1-bin-hadoop3.2.tgz


In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

## Exploring the Dataset

In [None]:
p1_address = '/content/drive/MyDrive/Tehran Polytechnic/BigData/HW01/P1/data.txt'

In [None]:
file1 = open(p1_address, 'r')
data = []
while True:
    line = file1.readline()
    if not line:
        break
    line_values = line.split(' ')
    data.append([line_values[0], line_values[1:-2]])
file1.close()

In [None]:
import pandas as pd

df = pd.DataFrame(data, columns=['key', 'value'])

df

Unnamed: 0,key,value
0,5988,"[748, 1722, 3752, 4655, 5743, 1872, 3413, 5527..."
1,5989,"[4080, 4264, 4446, 3779, 2430, 2297, 6169, 353..."
2,5982,"[217, 595, 1194, 3308, 2940, 1815, 794, 1503, ..."
3,5983,"[1165, 3836, 4361, 1282, 716, 4289, 4646, 6300..."
4,5980,"[2731, 3712, 1587, 6084, 2472, 2546, 6313, 875..."
...,...,...
6584,5637,"[2557, 3805, 4131, 2650, 4016, 5716]"
6585,5630,"[1165, 3868, 3614, 3615, 5421, 661, 133, 4452,..."
6586,5631,"[3949, 3934, 5294, 3889, 5333, 3352]"
6587,5632,"[2912, 4366, 2040, 1602, 4395, 133, 403, 2178]"


In [None]:
df.to_csv('p1.csv', sep=';')

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
    .appName("SparkByExamples.com").getOrCreate()

In [None]:
columns = ['key', 'value']
df_spark = spark.createDataFrame(data=data, schema=columns)
df_spark.show()

+----+--------------------+
| key|               value|
+----+--------------------+
|5988|[748, 1722, 3752,...|
|5989|[4080, 4264, 4446...|
|5982|[217, 595, 1194, ...|
|5983|[1165, 3836, 4361...|
|5980|[2731, 3712, 1587...|
|5981|[3569, 5353, 4087...|
|5986|[2658, 3712, 2650...|
|5987|[2614, 5716, 1765...|
|5984|[590, 4898, 745, ...|
|5985|[3233, 2254, 212,...|
|6294|[4898, 1127, 3220...|
| 270|[2658, 3003, 3805...|
| 271|[4935, 5716, 4309...|
| 272|[2717, 4363, 4088...|
| 273|[1165, 5013, 5110...|
| 274|[3920, 5310, 4024...|
| 275|[4366, 3373, 1587...|
| 276|[2277, 5251, 4806...|
| 277|[1068, 3495, 6194...|
| 278|[1145, 667, 2650,...|
+----+--------------------+
only showing top 20 rows



In [None]:
rdd2 = df_spark.rdd.map(lambda x: (x[0], len(x[1])))
df2 = rdd2.toDF(["key","len"])
df2.show()

+----+---+
| key|len|
+----+---+
|5988| 47|
|5989| 39|
|5982| 41|
|5983| 13|
|5980| 23|
|5981| 16|
|5986|141|
|5987| 80|
|5984| 40|
|5985| 18|
|6294| 12|
| 270| 41|
| 271|  8|
| 272| 44|
| 273| 57|
| 274|409|
| 275| 46|
| 276| 14|
| 277| 15|
| 278|122|
+----+---+
only showing top 20 rows



In [None]:
x = df2.groupBy('key').sum('len')
x.show()

+----+--------+
| key|sum(len)|
+----+--------+
| 691|       5|
|1159|      10|
|3959|     141|
|1572|      34|
|2294|      13|
|1090|       3|
|3606|     170|
|3414|       6|
| 296|      16|
|4821|      15|
|2162|      40|
|1436|       8|
|1512|      10|
|6194|      13|
|6240|      10|
| 829|      36|
|2136|       5|
|5645|      19|
|2069|     262|
| 467|       0|
+----+--------+
only showing top 20 rows



In [None]:
x.rdd.sortBy(lambda x: x[1]).collect()[::-1][:5]

[Row(key='859', sum(len)=1929),
 Row(key='5306', sum(len)=1737),
 Row(key='2664', sum(len)=1524),
 Row(key='5716', sum(len)=1423),
 Row(key='6306', sum(len)=1391)]

In [None]:
print(x.rdd.lookup('1748'))
print(x.rdd.lookup('5633'))
print(x.rdd.lookup('3469'))

[129]
[29]
[118]
