In [1]:
# Run this to start spark
#from pyspark import SparkContext
#sc = SparkContext()

# Alternative: use this to init spark session
from answers.answer import init_spark
spark = init_spark() # spark session (contains context and many other)
sc = spark.sparkContext # spark context

In [2]:
# RDD creation method 1: create RDD with new collection
sc.parallelize([1,2,3]).collect()

[1, 2, 3]

In [64]:
# Current work directory
import os
os.getcwd()

'D:\\ALL\\Work\\Coding\\Python\\soen471\\bigdata-la1-Fryingpannn'

In [17]:
# Good way to open, read and write to files. use "a" for append.
with open("test_file.txt", "w+") as f:
    for i in range(3):
        f.write("my number is %d\n" % i)
    f.seek(0)
    content = f.read()
    print(content)

my number is 0
my number is 1
my number is 2



In [4]:
# RDD creation method 2: create RDD from external dataset
sc.textFile("test_file.txt").collect()

['my number is 0', 'my number is 1', 'my number is 2']

In [9]:
# Collect, count and take actions. Use persist to cache.
rdd = spark.sparkContext.parallelize(i for i in range(10)).persist()
print(rdd.collect())
print(rdd.count())
print(rdd.take(2))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
10
[0, 1]


In [14]:
# flatMap: like map but returns a collection - maps one to many
rdd.flatMap(lambda x: [x*2, x*3] if x % 2 == 0 else []).collect()

[0, 0, 4, 6, 8, 12, 12, 18, 16, 24]

In [16]:
# map
rdd.map(lambda x: x*x).collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [17]:
# filter: return those that are True
rdd.filter(lambda x: x % 2 == 0).collect()

[0, 2, 4, 6, 8]

In [19]:
# reduceByKey(func): aggregate the values of each key (values are summed up)
data = [(i % 3, i) for i in range(5)]
print(data)
rdd2 = sc.parallelize(data)
rdd2.reduceByKey(lambda x, y: x+y).collect()

[(0, 0), (1, 1), (2, 2), (0, 3), (1, 4)]


[(0, 3), (1, 5), (2, 2)]

In [35]:
# groupByKey(): aggregate each key's values into an iterable.
listgroup = rdd2.groupByKey().collect()
for (a,b) in listgroup:
    print(f"{a} : ", end="")
    for x in b:
        print(x, end=" ")
    print()

0 : 0 3 
1 : 1 4 
2 : 2 


In [38]:
# intersection and union
rddI = sc.parallelize([i for i in range(3)])
rddI2 = sc.parallelize([i for i in range(5)])
print(f"Intersection: {rddI.intersection(rddI2).collect()}")
print(f"Union: {rddI.union(rddI2).collect()}")

Intersection: [0, 1, 2]
Union: [0, 1, 2, 0, 1, 2, 3, 4]


In [39]:
# distinct: like a set, return unique elements
rdddup = sc.parallelize([5] * 4)
print(rdddup.collect())
print(rdddup.distinct().collect())

[5, 5, 5, 5]
[5]


In [40]:
# zipWithIndex(): assign index to all elements
rdd.zipWithIndex().collect()

[(0, 0),
 (1, 1),
 (2, 2),
 (3, 3),
 (4, 4),
 (5, 5),
 (6, 6),
 (7, 7),
 (8, 8),
 (9, 9)]

In [46]:
# Create shell session from session. (spark = session here) and connect to csv
# Dataframes structured like SQL tables (RDDs)
df_session = spark._create_shell_session()
df = df_session.read.csv("./data/frenepublicinjection2015.csv", header=True, mode="DROPMALFORMED")

In [47]:
# df.show() to show first lines
# df.select() to get columns (returns dataframe)
# df.where("condition") to filter rows only True
# df.groupBy() to group rows together with some function e.g.: df.groupBy("columName").sum()
# df.orderBy("columnNames"..) to order df with columns
# limit(nb) to drop all rows after nbth row
# createDataFrame(rdd)
# join() combine df based on common column e.g.: df1.join(df2, df2.park_name == df1.park_name)
# toDF("newColName") to rename column namesof df
df.select("Rue", "Invent").collect()

[Row(Rue='Rue Saint-Hubert', Invent='R'),
 Row(Rue='Rue Foucher', Invent='R'),
 Row(Rue='Rue Taché', Invent='R'),
 Row(Rue='Rue Prieur Est', Invent='R'),
 Row(Rue='Avenue du Sacré-Coeur', Invent='R'),
 Row(Rue='Avenue Millen', Invent='R'),
 Row(Rue='Rue Parthenais', Invent='R'),
 Row(Rue='Avenue Papineau', Invent='R'),
 Row(Rue='Rue Basile-Routhier', Invent='R'),
 Row(Rue='Rue Basile-Routhier', Invent='R'),
 Row(Rue='Avenue Joseph-Melançon', Invent='R'),
 Row(Rue='Rue de Louvain Est', Invent='R'),
 Row(Rue='Rue de Louvain Est', Invent='R'),
 Row(Rue='Avenue Joseph-Melançon', Invent='R'),
 Row(Rue='Rue Prieur Est', Invent='R'),
 Row(Rue='Rue Prieur Est', Invent='R'),
 Row(Rue='Rue Prieur Est', Invent='R'),
 Row(Rue='Avenue Vianney', Invent='R'),
 Row(Rue='Rue Taché', Invent='R'),
 Row(Rue='Rue Taché', Invent='R'),
 Row(Rue='Rue Taché', Invent='R'),
 Row(Rue='Rue Taché', Invent='R'),
 Row(Rue='Rue Taché', Invent='R'),
 Row(Rue='Rue Saint-Hubert', Invent='R'),
 Row(Rue='Rue Saint-Hubert',

In [78]:
import csv
   # with open("./data/frenepublicinjection2016.csv", newline="", encoding='UTF-8') as csvfile:

def test(filename):
    with open(filename, encoding='UTF-8') as csvfile:
        lines = csv.reader(csvfile, delimiter=",")
        parknames_set = set()
        for line in lines:
            if line[6] and line[6] != "<Null>" and line[6] != "Null":
                parknames_set.add(line[6] + "\n")
    parknames_set.remove("Nom_parc\n")
    parknames = list(parknames_set)
    parknames.sort()
    return "".join(parknames)

In [79]:
print(test("./data/frenepublicinjection2016.csv"))

36E/DALBÉ-VIAU, ALLÉE PIÉTONIÈRE
ACHILLE-FORTIER, PARC
ADRIEN-D.-ARCHAMBAULT, PARC
AHUNTSIC (C.E.P.G.M.), PARC-ÉCOLE
AHUNTSIC, PARC
ALBERT-CHAMBERLAND, PARC
ALBERT-GARIÉPY, PARC
ALBERT-PERRAS, PARC
ALEXIS-CARREL, PARC
ALFRED-SAUVÉ, PARC
ALFREDO-F.CAMPOCATELLI, PARC
ALICE-NOLIN/CLAUDE-GAUVREAU, PARC
ALLARD/CARDINAL, PARC
ANCIENNE-PÉPINIÈRE, PARC
ANDERSON, PARC
ANDRÉ-CORBEIL-DIT-TRANCHEMONTAGNE, PROMENADE
ANDRÉ-LAURENDEAU, PARC
ANGRIGNON, PARC
ARAGON, PARC
ARCHIE-WILCOX, PARC
ARMAND-BOMBARDIER, PARC
ARTHUR-GÉNÉREUX/JOVETTE-BERNIER, PARC
ARÉNA HENRI-BOURASSA, ESPACE VERT
ARÉNA SAINT MICHEL, ESPACE VERT
ATELIER DE LA COUR CÔTE-DES-NEIGES, ESPACE VERT
ATWATER, MARCHÉ PUBLIC
AVILA-VIDAL, PARC
AVONMORE, TERRE-PLEIN AVENUE
AZILDA/ÉRIC, PARC
BALDWIN, PARC
BASILE-PATENAUDE, JARDIN COMMUNAUTAIRE
BASILE-ROUTHIER, PARC
BATELIERS, PROMENADE DES
BEAULAC, PARC
BEAUMONT/DE L'ÉPÉE, PARC
BEAUSÉJOUR, PARC DE
BELLERIVE, PARC
BELLERIVE/41E AVENUE, PARC
BELMONT, PARC
BELVÉDÈRE, PARC DU
BENNY, PARC-ÉCOLE
BER,

In [75]:
!pytest tests/test_uniq_parks.py

platform win32 -- Python 3.6.12, pytest-6.2.1, py-1.10.0, pluggy-0.13.1
rootdir: D:\ALL\Work\Coding\Python\soen471\bigdata-la1-Fryingpannn, configfile: pytest.ini
collected 1 item

tests\test_uniq_parks.py F                                               [100%]

_______________________________ test_uniq_parks _______________________________

    def test_uniq_parks():
        a = uniq_parks("./data/frenepublicinjection2016.csv")
        try:
            out = open("tests/list_parks.txt","r").read()
>           assert(a == out)
E           AssertionError: assert '36E/DALBÉ-VI...LINER, PARC\n' == '36E/DALBÃ‰-V...LINER, PARC\n'
E             - 36E/DALBÃ‰-VIAU, ALLÃ‰E PIÃ‰TONIÃˆRE
E             ?         ^^          ^^    ^^    ^^
E             + 36E/DALBÉ-VIAU, ALLÉE PIÉTONIÈRE
E             ?         ^          ^    ^    ^
E               ACHILLE-FORTIER, PARC
E               ADRIEN-D.-ARCHAMBAULT, PARC
E             - AHUNTSIC (C.E.P.G.M.), PARC-Ã‰COLE...
E             
E             ...

In [153]:
with open("./data/frenepublicinjection2016.csv", encoding='UTF-8') as csvfile:
    lines = csv.reader(csvfile)
    pairs = {}
    print(len(lines))
    for line in lines:
        if line[6] and line[6] != "<Null>" and line[6] != "Null":
            if line[6] not in pairs:
                pairs[line[6]] = 1
            else:
                pairs[line[6]] += 1
pairs.pop("Nom_parc")
result = ""
pairs = [(k, v) for k, v in pairs.items()]
pairs.sort(key=lambda tup: tup[1], reverse=True)
top_ten = [tup for tup in pairs[:10]]
for k, v in top_ten:
    result += k + "," + str(v) + "\n"
print(result)

TypeError: object of type '_csv.reader' has no len()

In [162]:

with open("./data/frenepublicinjection2015.csv", encoding='UTF-8') as csvfile, open("./data/frenepublicinjection2016.csv", encoding='UTF-8') as csvfile2:
    lines = csv.reader(csvfile)
    lines2 = csv.reader(csvfile2)
    parknames_set = set()
    parknames_set2 = set()
    for line in lines:
        if line[6] and line[6] != "<Null>" and line[6] != "Null":
            parknames_set.add(line[6] + "\n")
    for line in lines2:
        if line[6] and line[6] != "<Null>" and line[6] != "Null":
            parknames_set2.add(line[6] + "\n")
    parknames_set.remove("Nom_parc\n")
    parknames_set2.remove("Nom_parc\n")

intersection = parknames_set.intersection(parknames_set2)
result = list(intersection)
result.sort()
print("".join(result))

{'ATWATER, MARCHÉ PUBLIC\n', 'BERTRAND, PARC\n', 'Espace Vert Bertrand\n'}
ATWATER, MARCHÉ PUBLIC
BERTRAND, PARC
Espace Vert Bertrand

