# Spark Work by `Mr. Harshit Dawar`!

## Importing the required Libraries!

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

## Setting up the required configuration for the Spark

In [2]:
conf = SparkConf().setAppName("Stream Processing Assignment").setMaster("local")

In [3]:
sc = SparkContext(conf = conf)

## Creating RDD and Performing Transformations

In [16]:
names = ["Harshit", "Raj", "Rishu", "Alex", "Paul"]

In [17]:
names = sc.parallelize(names)

In [18]:
# Printing the Values in the RDD
names.take(5)

['Harshit', 'Raj', 'Rishu', 'Alex', 'Paul']

In [19]:
# Transformation 1: Map
count = names.map(lambda x: (x, 1))

In [20]:
count.collect()

[('Harshit', 1), ('Raj', 1), ('Rishu', 1), ('Alex', 1), ('Paul', 1)]

In [21]:
# Transformation 2: Filter
Initials = names.filter(lambda x: x[0] == "H")

In [23]:
Initials.collect()

['Harshit']

In [24]:
# Transformation 3: FlatMap
FMCount = names.flatMap(lambda x: (x, 1))

In [25]:
FMCount.collect()

['Harshit', 1, 'Raj', 1, 'Rishu', 1, 'Alex', 1, 'Paul', 1]

In [33]:
# Transformation 4: reduceByKey
count.reduceByKey(lambda a, b: a + b).collect()

[('Harshit', 1), ('Raj', 1), ('Rishu', 1), ('Alex', 1), ('Paul', 1)]

In [34]:
# Transformation 5: Union
newNames = sc.parallelize(["Dawar", "Oberoi", "katy", "kelly kelly", "Rajveer"])

In [41]:
newNames.union(names).take(10)

['Dawar',
 'Oberoi',
 'katy',
 'kelly kelly',
 'Rajveer',
 'Harshit',
 'Raj',
 'Rishu',
 'Alex',
 'Paul']

## Actions on RDD

In [42]:
# Action 1: collect()
names.collect()

['Harshit', 'Raj', 'Rishu', 'Alex', 'Paul']

In [44]:
# Action 2: take(number of elements)
names.take(3)

['Harshit', 'Raj', 'Rishu']

In [45]:
# Action 3: first()
names.first()

'Harshit'

In [49]:
# Action 4: reduce()
names.reduce(lambda a, b: a + " " + b)

'Harshit Raj Rishu Alex Paul'

In [50]:
# Action 5: count()
names.count()

5

## Word Count

In [51]:
data = ["Harshit", "Raj", "Rishu", "Alex", "Paul",
        "Harshit", "Raj", "Rishu", "Alex", "Paul",
        "Harshit", "Raj", "Rishu", "Alex", "Paul",
        "Harshit", "Raj", "Rishu", "Alex", "Paul",
        "Harshit", "Raj", "Rishu", "Alex", "Paul"]

In [53]:
sc.parallelize(data).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).collect()

[('Harshit', 5), ('Raj', 5), ('Rishu', 5), ('Alex', 5), ('Paul', 5)]

## Creating Dataframes from RDD & Vice-Versa

In [65]:
# Required to create Dataframes from Spark
spark = SparkSession(sc)

In [83]:
DataFrame = spark.createDataFrame(names.map(lambda x: x.split(","))).toDF("Names")

In [84]:
DataFrame.show()

+-------+
|  Names|
+-------+
|Harshit|
|    Raj|
|  Rishu|
|   Alex|
|   Paul|
+-------+



In [86]:
# Converting Dataframes into RDD
DataFrame.rdd.collect()

[Row(Names='Harshit'),
 Row(Names='Raj'),
 Row(Names='Rishu'),
 Row(Names='Alex'),
 Row(Names='Paul')]

## Creating table in SparkSQL & Displaying it

In [88]:
from pyspark.sql.types import StructType, StringType, IntegerType, StructField

In [89]:
schema = StructType([
    StructField("Image_Name", StringType(), nullable = False),
    StructField("SeverityLevel", IntegerType(), nullable = False)
                    ])

In [92]:
dataset = spark.read.csv("file:///Users/harshitdawar/Downloads/Sem 7/Stream Processing Lab/trainLabels.csv",
                         header=True,
                         schema = schema)

In [95]:
dataset.show(5)

+----------+-------------+
|Image_Name|SeverityLevel|
+----------+-------------+
|   10_left|            0|
|  10_right|            0|
|   13_left|            0|
|  13_right|            0|
|   15_left|            1|
+----------+-------------+
only showing top 5 rows



In [98]:
dataset.select("SeverityLevel").show(10)

+-------------+
|SeverityLevel|
+-------------+
|            0|
|            0|
|            0|
|            0|
|            1|
|            2|
|            4|
|            4|
|            0|
|            1|
+-------------+
only showing top 10 rows



# Assignment Finished!