In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [3]:
from pyspark import SparkContext
sc = spark.sparkContext

In [4]:
raw_data = sc.textFile("DollarDataset.txt")
raw_data.take(10)

['1\t02-01-1950\t2,80',
 '2\t03-01-1950\t2,80',
 '3\t04-01-1950\t2,80',
 '4\t05-01-1950\t2,80',
 '5\t06-01-1950\t2,80',
 '6\t09-01-1950\t2,80',
 '7\t10-01-1950\t2,80',
 '8\t11-01-1950\t2,80',
 '9\t12-01-1950\t2,80',
 '10\t13-01-1950\t2,80']

In [5]:
raw_data.count()

17776

In [6]:
def splitting(line):
    pr1 = line.split("\t")
    date1 = pr1[1]
    price = pr1[2].replace(".","").replace(",",".")
    return (date1,price)

splitted = raw_data.map(splitting)
splitted.take(3)

[('02-01-1950', '2.80'), ('03-01-1950', '2.80'), ('04-01-1950', '2.80')]

In [7]:
splitted = splitted.filter(lambda x: x[1] != '')
splitted.take(10)

[('02-01-1950', '2.80'),
 ('03-01-1950', '2.80'),
 ('04-01-1950', '2.80'),
 ('05-01-1950', '2.80'),
 ('06-01-1950', '2.80'),
 ('09-01-1950', '2.80'),
 ('10-01-1950', '2.80'),
 ('11-01-1950', '2.80'),
 ('12-01-1950', '2.80'),
 ('13-01-1950', '2.80')]

In [8]:
splitted.count()

17190

In [9]:
splitted = splitted.zipWithIndex()
splitted = splitted.map(lambda x :(x[1],x[0]))
splitted.take(10)

[(0, ('02-01-1950', '2.80')),
 (1, ('03-01-1950', '2.80')),
 (2, ('04-01-1950', '2.80')),
 (3, ('05-01-1950', '2.80')),
 (4, ('06-01-1950', '2.80')),
 (5, ('09-01-1950', '2.80')),
 (6, ('10-01-1950', '2.80')),
 (7, ('11-01-1950', '2.80')),
 (8, ('12-01-1950', '2.80')),
 (9, ('13-01-1950', '2.80'))]

In [10]:
to_float = splitted.map(lambda x: (x[0],(x[1][0],float(x[1][1]))))
to_float.take(10)

[(0, ('02-01-1950', 2.8)),
 (1, ('03-01-1950', 2.8)),
 (2, ('04-01-1950', 2.8)),
 (3, ('05-01-1950', 2.8)),
 (4, ('06-01-1950', 2.8)),
 (5, ('09-01-1950', 2.8)),
 (6, ('10-01-1950', 2.8)),
 (7, ('11-01-1950', 2.8)),
 (8, ('12-01-1950', 2.8)),
 (9, ('13-01-1950', 2.8))]

In [11]:
diff1 = to_float.map(lambda x : (x[0]+1,x[1]))
diff1.take(20)

[(1, ('02-01-1950', 2.8)),
 (2, ('03-01-1950', 2.8)),
 (3, ('04-01-1950', 2.8)),
 (4, ('05-01-1950', 2.8)),
 (5, ('06-01-1950', 2.8)),
 (6, ('09-01-1950', 2.8)),
 (7, ('10-01-1950', 2.8)),
 (8, ('11-01-1950', 2.8)),
 (9, ('12-01-1950', 2.8)),
 (10, ('13-01-1950', 2.8)),
 (11, ('16-01-1950', 2.8)),
 (12, ('17-01-1950', 2.8)),
 (13, ('18-01-1950', 2.8)),
 (14, ('19-01-1950', 2.8)),
 (15, ('20-01-1950', 2.8)),
 (16, ('23-01-1950', 2.8)),
 (17, ('24-01-1950', 2.8)),
 (18, ('25-01-1950', 2.8)),
 (19, ('26-01-1950', 2.8)),
 (20, ('27-01-1950', 2.8))]

In [12]:
joinedRDD = to_float.join(diff1).sortByKey()
joinedRDD.take(10)

[(1, (('03-01-1950', 2.8), ('02-01-1950', 2.8))),
 (2, (('04-01-1950', 2.8), ('03-01-1950', 2.8))),
 (3, (('05-01-1950', 2.8), ('04-01-1950', 2.8))),
 (4, (('06-01-1950', 2.8), ('05-01-1950', 2.8))),
 (5, (('09-01-1950', 2.8), ('06-01-1950', 2.8))),
 (6, (('10-01-1950', 2.8), ('09-01-1950', 2.8))),
 (7, (('11-01-1950', 2.8), ('10-01-1950', 2.8))),
 (8, (('12-01-1950', 2.8), ('11-01-1950', 2.8))),
 (9, (('13-01-1950', 2.8), ('12-01-1950', 2.8))),
 (10, (('16-01-1950', 2.8), ('13-01-1950', 2.8)))]

In [13]:
resultRDD = joinedRDD.map(lambda x: (x[1][0][0], ((x[1][0][1] - x[1][1][1]) / x[1][1][1])*100))
resultRDD.sortBy(lambda x: x[1], ascending=False).take(5)

[('22-08-1960', 221.42857142857144),
 ('25-01-1980', 100.0),
 ('10-08-1970', 64.99999999999999),
 ('23-02-2001', 39.75657690281898),
 ('06-04-1994', 38.88985856101814)]