In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("rdd_demo").getOrCreate()

conf = SparkConf().setMaster("local").setAppName("MiniTempratures")
sc = SparkContext.getOrCreate()


In [3]:
spark

In [6]:
flightData2015 = spark.read.option("inferSchema","false").option("header","true").csv("2015-summary.csv")

In [7]:
flightData2015.take(5)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count='15'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count='1'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count='344'),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count='15'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count='62')]

In [8]:
flightData2015

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: string]

In [9]:
type(flightData2015)

In [10]:
flightData2015.count()

256

In [12]:
##read using datasource API
flightData2015 = spark.read.option("inferSchema","false").option("header",'true').csv("2015-summary.csv")

##commands
flightData2015 = flightData2015.toDF("dest","source","count").rdd
print(type(flightData2015))
print(flightData2015.take(1))
print(flightData2015.count())

<class 'pyspark.rdd.RDD'>
[Row(dest='United States', source='Romania', count='15')]
256


In [15]:
###read using sparkContext
spth = "2015-summary.csv"
sc_flightData2015 = spark.sparkContext.textFile(spth)
print(type(sc_flightData2015))
print(sc_flightData2015.take(2))
print(sc_flightData2015.take(3))

<class 'pyspark.rdd.RDD'>
['DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count', 'United States,Romania,15']
['DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count', 'United States,Romania,15', 'United States,Croatia,1']


In [16]:
###convert pandas file to RDD

import pandas as pd

spth="2015-summary.csv"
pd_flightData2015 = pd.read_csv(spth,header = 0)
print(type(pd_flightData2015))
print(pd_flightData2015.head())
pd_flightData2015 = spark.createDataFrame(pd_flightData2015).rdd
print(type(pd_flightData2015))
print(pd_flightData2015.take(1))

<class 'pandas.core.frame.DataFrame'>
  DEST_COUNTRY_NAME ORIGIN_COUNTRY_NAME  count
0     United States             Romania     15
1     United States             Croatia      1
2     United States             Ireland    344
3             Egypt       United States     15
4     United States               India     62
<class 'pyspark.rdd.RDD'>
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)]


In [17]:
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
words = spark.sparkContext.parallelize(myCollection,2)
print(type(words))
print(words.collect())
print(words.take(10))

<class 'pyspark.rdd.RDD'>
['Spark', 'The', 'Definitive', 'Guide', ':', 'Big', 'Data', 'Processing', 'Made', 'Simple']
['Spark', 'The', 'Definitive', 'Guide', ':', 'Big', 'Data', 'Processing', 'Made', 'Simple']


In [18]:
###from collection of text
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
words = spark.sparkContext.parallelize(myCollection,2)

words.setName("myWords")
print(words.name())
print(type(words))
print(words.take(5))
words.getNumPartitions()

myWords
<class 'pyspark.rdd.RDD'>
['Spark', 'The', 'Definitive', 'Guide', ':']


2

In [19]:
##from a range of numbers
myRange = spark.range(1000000000000).toDF("numbers").rdd.map(lambda row: row[0])

print(myRange.take(10))
print(type(myRange))
myRange.getNumPartitions()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
<class 'pyspark.rdd.PipelinedRDD'>


1

In [20]:
###flitering

def parseLine(line):
  fields = line.split(',')
  date = fields[0]
  p_open = fields[1]
  p_close = fields[5]
  return (date,p_open,p_close)

In [21]:
spth = "RELIANCE.csv"
sdt = spark.sparkContext.textFile(spth)
sdt =sdt.map(parseLine)
sdt.take(2)

[('Date', 'Open', 'Close'), ('2022-01-01', '100', '10000')]

In [25]:
spth = "RELIANCE.csv"
o_sdt = spark.read.format("CSV").option("inferSchema","true").option("header","true").load(spth)
o_sdt = o_sdt.toDF("Date","Open","High","Low","Last","Close","Volume","Turnover").rdd.map(lambda row: (row[0], row[1], row[5]))
print(o_sdt.count())
print(type(o_sdt))
print(o_sdt.take(5))

o_sdt = o_sdt.filter(lambda row: row[2] > row[1])
print(o_sdt.take(5))
print(type(o_sdt))
print(o_sdt.count())

5
<class 'pyspark.rdd.PipelinedRDD'>
[(datetime.date(2022, 1, 1), 100, 10000), (datetime.date(2022, 1, 2), 200, 20000), (datetime.date(2022, 1, 3), 300, 30000), (datetime.date(2022, 1, 4), 400, 40000), (datetime.date(2022, 1, 5), 500, 50000)]
[(datetime.date(2022, 1, 1), 100, 10000), (datetime.date(2022, 1, 2), 200, 20000), (datetime.date(2022, 1, 3), 300, 30000), (datetime.date(2022, 1, 4), 400, 40000), (datetime.date(2022, 1, 5), 500, 50000)]
<class 'pyspark.rdd.PipelinedRDD'>
5


In [26]:
##filter function

def HighClose(row):
  if(row[2] > row[1]):
    return row

In [28]:
o_sdt = spark.read.format("CSV").option("header","true").load(spth)
o_sdt = o_sdt.toDF("Date","Open","High","Low","Last","Close","Volume","Turnover").rdd.map(lambda row: (row[0], row[1], row[5]))
print(o_sdt.count())
o_sdt = o_sdt.filter(lambda row: HighClose(row))
print(o_sdt.take(5))
print(type(o_sdt))
print(o_sdt.count())

5
[('2022-01-01', '100', '10000'), ('2022-01-02', '200', '20000'), ('2022-01-03', '300', '30000'), ('2022-01-04', '400', '40000'), ('2022-01-05', '500', '50000')]
<class 'pyspark.rdd.PipelinedRDD'>
5


In [29]:
#maps
def to_to_mill(row):
    return (row[0], row[1], row[2], round(row[3],0))


In [31]:
spth = "RELIANCE.csv"
o_sdt = spark.read.format("CSV").option("header","true").option("inferSchema","true").load(spth)
o_sdt = o_sdt.toDF("Date","Open","High","Low","Last","Close","Volume","Turnover").rdd.map(lambda row: (row[0], row[1], row[5], row[7]))
print(o_sdt.take(2))
o_sdt = o_sdt.map(to_to_mill)
print(type(o_sdt))

[(datetime.date(2022, 1, 1), 100, 10000, None), (datetime.date(2022, 1, 2), 200, 20000, None)]
<class 'pyspark.rdd.PipelinedRDD'>


In [38]:
#flatmap
def Func(lines):
    lines = lines.lower()
    lines = lines.split(" ")
    return lines

#sc.stop()
#conf = SparkConf().setMaster("local").setAppName("wordcount")
#sc = SparkContext.getOrCreate()

spth="sherlock_holmes.txt"
input_file = sc.textFile(spth)

In [39]:
input_file.take(1)

["Project Gutenberg's The Adventures of Sherlock Holmes, by Arthur Conan Doyle"]

In [40]:
rdd1 = input_file.flatMap(Func)
rdd1.take(5)

['project', "gutenberg's", 'the', 'adventures', 'of']

In [41]:
rdd1.map(lambda x: (x,1)).take(5)

[('project', 1), ("gutenberg's", 1), ('the', 1), ('adventures', 1), ('of', 1)]

In [42]:
rdd2 = rdd1.map(lambda x: (x,1)).groupByKey().mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)
rdd2.take(5)

[(5704, 'the'), (3145, ''), (2882, 'and'), (2756, 'of'), (2719, 'to')]

In [43]:
##reduce
spark.sparkContext.parallelize(range(1,5)).reduce(lambda x, y: x+y)

10