# Setup

In [1]:
# Common imports
import os
import tarfile
from six.moves import urllib
from pyspark.context import SparkContext

# Initializing Spark
sc = SparkContext.getOrCreate()

# Get the data

We first use the text of the [Spark Wikipedia page (English version)](https://en.wikipedia.org/wiki/Apache_Spark)

Then, we consider a publication dataset containing information of over 3 million papers published in computer science conferences and journals (this data was derived from the DBLP system, maintained by Michael Ley at http://www.informatik.uni-trier.de/_ley/db/).

Finally we will use sample files for SparkML [FP-Growth](https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html#fp-growth) and [K-means](https://spark.apache.org/docs/latest/ml-clustering.html#k-means).

In [2]:
SPARK_WIKI_URL = "https://www.dropbox.com/s/5ctx25rm2xtls30/spark_wiki.txt?raw=1"
SPARK_WIKI_PATH = "/tmp"
SPARK_WIKI_FILE_NAME = "spark_wiki.txt"

def fetch_spark_wiki(url=SPARK_WIKI_URL, path=SPARK_WIKI_PATH, file_name=SPARK_WIKI_FILE_NAME):
    if not os.path.isdir(path):
        os.makedirs(path)
    full_path = os.path.join(path, file_name)
    urllib.request.urlretrieve(url, full_path)

In [3]:
DBLP_TSV_URL = "https://www.dropbox.com/s/4s7do56blmf8cz8/dblp_tsv.tar.gz?raw=1"
DBLP_TSV_PATH = "/tmp"

def fetch_dblp_tsv(url=DBLP_TSV_URL, path=DBLP_TSV_PATH):
    if not os.path.isdir(path):
        os.makedirs(path)
    tgz_path = os.path.join(path, "dblp_tsv.tar.gz")
    urllib.request.urlretrieve(url, tgz_path)
    dblp_tsv_tar = tarfile.open(tgz_path)
    dblp_tsv_tar.extractall(path=path)
    dblp_tsv_tar.close()

In [6]:
SAMPLE_FPGROWTH_URL = "https://www.dropbox.com/s/fa1o9r67hqupbet/sample_fpgrowth.txt?raw=1"
SAMPLE_FPGROWTH_PATH = "/tmp"
SAMPLE_FPGROWTH_FILE_NAME = "sample_fpgrowth.txt"

def fetch_sample_fpgrowth(url=SAMPLE_FPGROWTH_URL, path=SAMPLE_FPGROWTH_PATH, file_name=SAMPLE_FPGROWTH_FILE_NAME):
    if not os.path.isdir(path):
        os.makedirs(path)
    full_path = os.path.join(path, file_name)
    urllib.request.urlretrieve(url, full_path)

In [57]:
SAMPLE_FPGROWTH_WITH_DUPLICATES_URL = "https://www.dropbox.com/s/x21qd4ivrrus57c/sample_fpgrowth_with_duplicates.txt?raw=1"
SAMPLE_FPGROWTH_WITH_DUPLICATES_PATH = "/tmp"
SAMPLE_FPGROWTH_WITH_DUPLICATES_FILE_NAME = "sample_fpgrowth_with_duplicates.txt"

def fetch_sample_fpgrowth_with_duplicates(url=SAMPLE_FPGROWTH_WITH_DUPLICATES_URL, path=SAMPLE_FPGROWTH_WITH_DUPLICATES_PATH, file_name=SAMPLE_FPGROWTH_WITH_DUPLICATES_FILE_NAME):
    if not os.path.isdir(path):
        os.makedirs(path)
    full_path = os.path.join(path, file_name)
    urllib.request.urlretrieve(url, full_path)

In [64]:
SAMPLE_KMEANS_URL = "https://www.dropbox.com/s/w19zt1c9o41zh5j/sample_kmeans.txt?raw=1"
SAMPLE_KMEANS_PATH = "/tmp"
SAMPLE_KMEANS_FILE_NAME = "sample_kmeans.txt"

def fetch_sample_kmeans(url=SAMPLE_KMEANS_URL, path=SAMPLE_KMEANS_PATH, file_name=SAMPLE_KMEANS_FILE_NAME):
    if not os.path.isdir(path):
        os.makedirs(path)
    full_path = os.path.join(path, file_name)
    urllib.request.urlretrieve(url, full_path)

In [4]:
fetch_spark_wiki()

In [5]:
fetch_dblp_tsv()

In [7]:
fetch_sample_fpgrowth()

In [58]:
fetch_sample_fpgrowth_with_duplicates()

In [65]:
fetch_sample_kmeans()

# PairRDDs Join

In [8]:
left = sc.parallelize([(1, "A"), (2, "B"), (3, "C")])
right = sc.parallelize([(1, "I"), (4, "II"), (5, "III")])
left.join(right).collect()

[(1, ('A', 'I'))]

In [9]:
left.leftOuterJoin(right).collect()

[(1, ('A', 'I')), (2, ('B', None)), (3, ('C', None))]

In [10]:
left.rightOuterJoin(right).collect()

[(1, ('A', 'I')), (4, (None, 'II')), (5, (None, 'III'))]

# Examples using the DBLP dataset

## Create an RDD from a data file containing the DBLP papers

In [11]:
papers = sc.textFile("/tmp/dblp_tsv/papers.tsv")

## Get the first paper

In [12]:
papers.first()

'0\tParallel Integer Sorting and Simulation Amongst CRCW Models.\t0\t607-619\thttp://dx.doi.org/10.1007/BF03036466'

## Get the total number of papers

In [13]:
papers.count()

3150923

## Filter papers containing the word "Solving" in the title

In [14]:
papersContainingSolving = papers.filter(lambda line: "Solving" in line)

In [15]:
papersContainingSolving.count()

9461

## For those papers containing the word "Solving" return the distinct words count

In [16]:
papersTokens = papersContainingSolving.map(lambda line: line.split("\t"))

In [17]:
papersTokens.flatMap(lambda tokens: tokens[1].split(" ")).distinct().count()

14191

## For those papers containing the word "Solving" compute the frequency of each word (i.e. word count)

In [18]:
papersTokens.flatMap(lambda tokens: tokens[1].split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()

[('Guarding', 3),
 ("Students'", 20),
 ('via', 216),
 ('Approach', 362),
 ('Problems', 986),
 ('Heuristic', 96),
 ('Trends', 1),
 ('Case-Based', 23),
 ('casting', 1),
 ('algorithm.', 111),
 ('Meta-Cognitive', 1),
 ('Example', 6),
 ('Errors.', 3),
 ('Activities.', 8),
 ('1', 5),
 ('function.', 9),
 ('mathematical', 14),
 ('Combinatorial', 70),
 ('Mode', 2),
 ('Saint-Venant', 1),
 ('Equations.', 289),
 ('Agents.', 17),
 ('Belief', 9),
 ('Satisfiability', 79),
 ('Quantum', 32),
 ('Issue', 6),
 ('Order', 43),
 ('Prentice-Hall', 1),
 ('penalty', 2),
 ('results.', 5),
 ('Tackling', 1),
 ('Learning.', 22),
 ('Genetic', 239),
 ('Discrete', 69),
 ('Bounded', 25),
 ('kinetic', 2),
 ('scattering.', 1),
 ('electrodeposition', 1),
 ('past', 1),
 ('overset', 1),
 ('master', 5),
 ('Liouville', 2),
 ('Voronoi', 6),
 ('Finite-Volume', 1),
 ('Space-Transformation', 1),
 ('Three-Dimensional', 10),
 ('Product', 13),
 ('Least', 29),
 ('Preconditioned', 16),
 ('Revisited.', 5),
 ('Performance', 61),
 ('Deco

## Find the paper with more words in the title

In [19]:
papersTokens.map(lambda tokens: len(tokens[1].split(" "))) \
    .reduce(lambda a, b: max(a, b))

41

# PairRDDs join examples using the DBLP dataset

## Create an RDD from a data file containing the DBLP papers

In [20]:
papers = sc.textFile("/tmp/dblp_tsv/papers.tsv")

In [21]:
papers.first()

'0\tParallel Integer Sorting and Simulation Amongst CRCW Models.\t0\t607-619\thttp://dx.doi.org/10.1007/BF03036466'

## Create an RDD from a data file containing the DBLP venues

In [22]:
venues = sc.textFile("/tmp/dblp_tsv/venue.tsv")

In [23]:
venues.first()

'795226\tCoRR\t2014\t""\tabs/1409.0286\t""\t0'

## Join the papers and the venues (on the venue id), and get the first joined element

In [None]:
# Complete

In [28]:
joined.first()

('4',
 ('Schnelle Multiplikation von Polynomenüber Körpern der Charakteristik 2.',
  'Acta Inf.'))

## For the venues since year 2010 (inclusive), count the number of papers per venue. Return the venue name, venue year and number of papers published in the venue.

In [30]:
# Complete

[('IJDSST 2010', 4),
 ('CoRR 2011', 1),
 ('IJHPCN 2015', 7),
 ('J. Optimization Theory and Applications 2010', 12),
 ('CoRR 2014', 1),
 ('CoRR 2015', 1),
 ('CoRR 2013', 1),
 ('CoRR 2015', 1),
 ('Human Factors 2010', 10),
 ('Comp.-Aided Civil and Infrastruct. Engineering 2012', 6),
 ('Commun. ACM 2015', 28),
 ('TAAS 2014', 6),
 ('Softwaretechnik-Trends 2015', 22),
 ('CoRR 2012', 1),
 ('Concurrency and Computation: Practice and Experience 2013', 6),
 ('CoRR 2014', 1),
 ('ACM SIGSOFT Software Engineering Notes 2013', 20),
 ('IEICE Transactions 2012', 35),
 ('CoRR 2015', 1),
 ('CoRR 2015', 1),
 ('IJDSST 2011', 4),
 ('CoRR 2014', 1),
 ('CoRR 2013', 1),
 ('CoRR 2014', 1),
 ('CoRR 2010', 1),
 ('CoRR 2011', 1),
 ('IJDSST 2012', 4),
 ('CoRR 2014', 1),
 ('CoRR 2015', 1),
 ("Revue d'Intelligence Artificielle 2014", 6),
 ('Social Networks 2010', 5),
 ('IJHPCN 2012', 7),
 ('TAAS 2012', 5),
 ('CoRR 2011', 1),
 ('CoRR 2012', 1),
 ('CoRR 2014', 1),
 ('CoRR 2014', 1),
 ('CoRR 2014', 1),
 ('CoRR 2014', 

## Modify the above to group by venue name. Return the venue name and number of papers published in venues with such name.

In [31]:
# Complete

[('Graphs and Combinatorics', 689),
 ('JCDL', 530),
 ('Quantum Information&Computation', 426),
 ('Robotics and Autonomous Systems', 906),
 ('IEEE Trans. Mob. Comput.', 1048),
 ('SIAM J. Imaging Sciences', 427),
 ('ICMI', 477),
 ('IJEHMC', 136),
 ('eLearn Magazine', 353),
 ('C&RL', 427),
 ('Sci. Comput. Program.', 820),
 ('Adv. Comput. Math.', 313),
 ('EUROMICRO-SEAA', 404),
 ('J. Clinical Bioinformatics', 126),
 ('IJBIR', 95),
 ('Multimedia Tools Appl.', 1933),
 ('IWOCL', 34),
 ('CoSECivi', 43),
 ('Information Security Journal: A Global Perspective', 177),
 ('ESTImedia', 104),
 ('Journal of Geographical Systems', 121),
 ('ACIIDS (1)', 328),
 ('I. J. Network Security', 427),
 ('Multiscale Modeling&Simulation', 325),
 ('Computer Networks', 1642),
 ('NSS', 345),
 ('Foundations and Trends in Computer Graphics and Vision', 12),
 ('WISM (3)', 61),
 ('International Journal of Software Engineering and Knowledge Engineering',
  339),
 ('IRMJ', 108),
 ('DAFx', 44),
 ('WRT@ICSE', 19),
 ('Intell. 

# Sorting an RDD

In [32]:
pairs = sc.parallelize([(1, "Z"), (2, "Y"), (3, "X")])
pairs.sortByKey().collect()

[(1, 'Z'), (2, 'Y'), (3, 'X')]

In [33]:
pairs.sortByKey(False).collect()

[(3, 'X'), (2, 'Y'), (1, 'Z')]

In [34]:
pairs.sortBy(lambda x: x[0]).collect()

[(1, 'Z'), (2, 'Y'), (3, 'X')]

In [35]:
pairs.sortBy(lambda x: x[0], False).collect()

[(3, 'X'), (2, 'Y'), (1, 'Z')]

In [36]:
pairs.sortBy(lambda x: x[1]).collect()

[(3, 'X'), (2, 'Y'), (1, 'Z')]

# RDD sorting examples using the DBLP dataset

## Modify the previous example to get, for those venues since year 2010 (inclusive), the number of papers per venue name such that it returns the venue name and number of papers published in venues with such name ordered by venue name

In [37]:
# Complete

[('"""CloudCom-Asia"', 29),
 ('#MSM', 64),
 ('25 Years GULP', 14),
 ('3DIC', 389),
 ('3DIMPVT', 129),
 ('3DOR', 107),
 ('3DTV-Conference', 175),
 ('3DUI', 293),
 ('3DV', 217),
 ('3DV (Workshops)', 18),
 ('3GSE', 11),
 ('3PGCIC', 445),
 ('40 Jahre Informatik @ Braunschweig', 17),
 ('4OR', 185),
 ('5GU', 52),
 ('A Tribute to Prof. Dr. Da Ruan', 64),
 ('A-TEST@SIGSOFT FSE', 7),
 ('A2CWiC', 69),
 ('A4Cloud', 13),
 ('AAAI', 2384),
 ('AAAI (Late-Breaking Developments)', 52),
 ('AAAI Fall Symposium: Advances in Cognitive Systems', 44),
 ('AAAI Fall Symposium: Artificial Intelligence for Gerontechnology', 12),
 ('AAAI Fall Symposium: Artificial Intelligence of Humor', 22),
 ('AAAI Fall Symposium: Building Representations of Common Ground with Intelligent Agents',
  7),
 ('AAAI Fall Symposium: Cognitive and Metacognitive Educational Systems', 18),
 ('AAAI Fall Symposium: Commonsense Knowledge', 23),
 ('AAAI Fall Symposium: Complex Adaptive Systems', 44),
 ('AAAI Fall Symposium: Computational Mo

## Modify the previous example to get, for those venues since year 2010 (inclusive), the number of papers per venue name such that it returns the venue name and number of papers published in venues with such name ordered descendingly by number of published papers

In [38]:
# Complete

[('CoRR', 75914),
 ('ICASSP', 8918),
 ('IEICE Transactions', 8607),
 ('IGARSS', 8013),
 ('Applied Mathematics and Computation', 7483),
 ('ICC', 6614),
 ('Expert Syst. Appl.', 6570),
 ('NeuroImage', 6428),
 ('ICIP', 6016),
 ('ICRA', 5577),
 ('IACR Cryptology ePrint Archive', 5308),
 ('IROS', 5253),
 ('CDC', 4998),
 ('GLOBECOM', 4849),
 ('ISCAS', 4834),
 ('Neurocomputing', 4633),
 ('INTERSPEECH', 4529),
 ('Bioinformatics', 4302),
 ('ACC', 4138),
 ('SMC', 3953),
 ('European Journal of Operational Research', 3944),
 ('BMC Bioinformatics', 3894),
 ('ISIT', 3674),
 ('IEEE Transactions on Industrial Electronics', 3617),
 ('J. Comput. Physics', 3518),
 ('HICSS', 3500),
 ('Inf. Sci.', 3455),
 ('ICNC', 3350),
 ('WCNC', 3319),
 ('Wireless Personal Communications', 3285),
 ('IEEE Transactions on Signal Processing', 3269),
 ('IJCNN', 3184),
 ('VTC Spring', 3166),
 ('IEEE Transactions on Information Theory', 3161),
 ('IEEE Communications Letters', 3115),
 ('Computers&Mathematics with Applications', 

# DataFrames

In [39]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower

spark = SparkSession \
    .builder \
    .getOrCreate()

In [40]:
papers_df = spark.read \
    .option("header", "false") \
    .option("delimiter", "\t") \
    .csv("/tmp/dblp_tsv/papers.tsv")

In [41]:
papers_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)



In [42]:
papers_df.show()

+---+--------------------+---+-------+--------------------+
|_c0|                 _c1|_c2|    _c3|                 _c4|
+---+--------------------+---+-------+--------------------+
|  0|Parallel Integer ...|  0|607-619|http://dx.doi.org...|
|  1|Pattern Matching ...|  1|227-248|http://dx.doi.org...|
|  2|NP-complete Probl...|  1|171-178|http://dx.doi.org...|
|  3|On the Power of C...|  3|425-433|http://dx.doi.org...|
|  4|Schnelle Multipli...|  4|395-398|http://dx.doi.org...|
|  5|A characterizatio...|  5|  19-24|http://dx.doi.org...|
|  6|The Derivation of...|  6|595-632|http://dx.doi.org...|
|  7|Fifo Nets Without...|  7|  15-36|http://dx.doi.org...|
|  8|On the Complement...|  8|297-305|http://dx.doi.org...|
|  9|Equational weight...|  9|  29-52|http://dx.doi.org...|
| 10|Merged processes:...| 10|307-330|http://dx.doi.org...|
| 11|Verifying a simpl...| 11|199-228|http://dx.doi.org...|
| 12|A Three-Stage Con...|  1|197-206|http://dx.doi.org...|
| 13|The Expressive Po...| 13|447-452|ht

In [43]:
papers_df = papers_df.select(col("_c0").alias("paper_id"), col("_c1").alias("paper_name"), col("_c2").alias("fk_venue_id"))

In [44]:
papers_df.show()

+--------+--------------------+-----------+
|paper_id|          paper_name|fk_venue_id|
+--------+--------------------+-----------+
|       0|Parallel Integer ...|          0|
|       1|Pattern Matching ...|          1|
|       2|NP-complete Probl...|          1|
|       3|On the Power of C...|          3|
|       4|Schnelle Multipli...|          4|
|       5|A characterizatio...|          5|
|       6|The Derivation of...|          6|
|       7|Fifo Nets Without...|          7|
|       8|On the Complement...|          8|
|       9|Equational weight...|          9|
|      10|Merged processes:...|         10|
|      11|Verifying a simpl...|         11|
|      12|A Three-Stage Con...|          1|
|      13|The Expressive Po...|         13|
|      14|Calculi for Inter...|         14|
|      15|A Synthesis of Se...|         15|
|      16|A Workload Model ...|         16|
|      17|Gray visiting Mot...|         17|
|      18|Trace- and failur...|         18|
|      19|Branching Process...| 

In [45]:
venue_df = spark.read \
    .option("header", "false") \
    .option("delimiter", "\t") \
    .csv("/tmp/dblp_tsv/venue.tsv")

In [46]:
venue_df.show()

+-------+--------------------+----+----+--------------+-----+---+
|    _c0|                 _c1| _c2| _c3|           _c4|  _c5|_c6|
+-------+--------------------+----+----+--------------+-----+---+
| 795226|                CoRR|2014|null| abs/1409.0286| null|  0|
| 840379|Computational Bio...|2008|null|            32|    1|  0|
| 107610|Earth Science Inf...|2008|null|             1|    1|  0|
| 617896|              IJPEDS|2009|null|            24|    4|  0|
| 786188|                CoRR|2014|null| abs/1409.0289| null|  0|
| 277767|EURASIP J. Wirele...|2012|null|          2012| null|  0|
| 803082|                CoRR|2015|null|abs/1506.07062| null|  0|
| 824640|                CoRR|2008|null| abs/0812.0736| null|  0|
|1150895|Automatic Control...|2007|null|            41|    3|  0|
|1373655|J. of Management ...|2007|null|            23|    4|  0|
| 496575|Mathematical and ...|2009|null|            49|11-12|  0|
| 170887|              IJTMCC|2014|null|             2|    3|  0|
| 642760|I

In [47]:
venue_df = venue_df.select(col("_c0").alias("venue_id"), col("_c1").alias("venue_name"), col("_c2").alias("venue_year"))

In [48]:
venue_df.show()

+--------+--------------------+----------+
|venue_id|          venue_name|venue_year|
+--------+--------------------+----------+
|  795226|                CoRR|      2014|
|  840379|Computational Bio...|      2008|
|  107610|Earth Science Inf...|      2008|
|  617896|              IJPEDS|      2009|
|  786188|                CoRR|      2014|
|  277767|EURASIP J. Wirele...|      2012|
|  803082|                CoRR|      2015|
|  824640|                CoRR|      2008|
| 1150895|Automatic Control...|      2007|
| 1373655|J. of Management ...|      2007|
|  496575|Mathematical and ...|      2009|
|  170887|              IJTMCC|      2014|
|  642760|International Jou...|      2015|
|  810275|                CoRR|      2014|
|  828151|                CoRR|      2014|
|  851176|              IJHPCN|      2015|
| 1392052|J. Electronic Ima...|      2004|
|   90908|Informatics in Ed...|      2008|
| 1144593|SIAM J. Imaging S...|      2008|
|  809632|                CoRR|      2011|
+--------+-

In [49]:
joined_df = papers_df.join(venue_df, papers_df.fk_venue_id == venue_df.venue_id)

In [50]:
joined_df.show()

+--------+--------------------+-----------+--------+----------+----------+
|paper_id|          paper_name|fk_venue_id|venue_id|venue_name|venue_year|
+--------+--------------------+-----------+--------+----------+----------+
|       0|Parallel Integer ...|          0|       0| Acta Inf.|      1996|
|       1|Pattern Matching ...|          1|       1| Acta Inf.|      1983|
|       2|NP-complete Probl...|          1|       1| Acta Inf.|      1983|
|       3|On the Power of C...|          3|       3| Acta Inf.|      1982|
|       4|Schnelle Multipli...|          4|       4| Acta Inf.|      1977|
|       5|A characterizatio...|          5|       5| Acta Inf.|      2011|
|       6|The Derivation of...|          6|       6| Acta Inf.|      1987|
|       7|Fifo Nets Without...|          7|       7| Acta Inf.|      1988|
|       8|On the Complement...|          8|       8| Acta Inf.|      1978|
|       9|Equational weight...|          9|       9| Acta Inf.|      2012|
|      10|Merged processe

In [51]:
filtered_df = joined_df.filter(col("venue_year") == 2010).filter(lower(col("venue_name")).contains("sigmod"))
print("Count:", filtered_df.count())
filtered_df.show()

Count: 165
+--------+--------------------+-----------+--------+-------------+----------+
|paper_id|          paper_name|fk_venue_id|venue_id|   venue_name|venue_year|
+--------+--------------------+-----------+--------+-------------+----------+
|  204132|Beyond isolation:...|     204132|  204132|SIGMOD Record|      2010|
|  204281|The declarative i...|     204132|  204132|SIGMOD Record|      2010|
|  204377|Emerging multidis...|     204377|  204377|SIGMOD Record|      2010|
|  204386|Scientific data m...|     204377|  204377|SIGMOD Record|      2010|
|  204422|Elisa Bertino spe...|     204422|  204422|SIGMOD Record|      2010|
|  204444|SmartCIS: integra...|     204132|  204132|SIGMOD Record|      2010|
|  204492|Business intellig...|     204422|  204422|SIGMOD Record|      2010|
|  204516|Report on the fir...|     204132|  204132|SIGMOD Record|      2010|
|  204573|The chair's repor...|     204422|  204422|SIGMOD Record|      2010|
|  204697|The SIGMOD 2010 p...|     204422|  204422|S

In [52]:
from pyspark.sql.functions import udf

documentDF = spark.createDataFrame([
    ("Hi I heard about Spark", ),
    ("I wish Java could use case classes", ),
    ("I am an experienced Spark programmer", ),
    ("Looking for experienced Spark developers", ),
    ("Logistic regression models are neat", )
], ["text"])

def words_count(str):
    return len(str.split(" "))

words_count_udf = udf(words_count)

documentDF.select("text", words_count_udf(documentDF.text).alias("words")).show()

+--------------------+-----+
|                text|words|
+--------------------+-----+
|Hi I heard about ...|    5|
|I wish Java could...|    7|
|I am an experienc...|    6|
|Looking for exper...|    5|
|Logistic regressi...|    5|
+--------------------+-----+



# Frequent Pattern mining FP-growth

In [54]:
from pyspark.mllib.fpm import FPGrowth

data = sc.textFile("/tmp/sample_fpgrowth.txt")
transactions = data.map(lambda line: line.strip().split(" "))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=4)
result = model.freqItemsets().collect()
for fi in result:
    print(fi)

FreqItemset(items=['r'], freq=3)
FreqItemset(items=['r', 'x'], freq=2)
FreqItemset(items=['r', 'z'], freq=2)
FreqItemset(items=['z'], freq=5)
FreqItemset(items=['s'], freq=3)
FreqItemset(items=['s', 't'], freq=2)
FreqItemset(items=['s', 't', 'x'], freq=2)
FreqItemset(items=['s', 't', 'x', 'z'], freq=2)
FreqItemset(items=['s', 't', 'z'], freq=2)
FreqItemset(items=['s', 'x'], freq=3)
FreqItemset(items=['s', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'y'], freq=2)
FreqItemset(items=['s', 'y', 't'], freq=2)
FreqItemset(items=['s', 'y', 't', 'x'], freq=2)
FreqItemset(items=['s', 'y', 't', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'y', 't', 'z'], freq=2)
FreqItemset(items=['s', 'y', 'x'], freq=2)
FreqItemset(items=['s', 'y', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'y', 'z'], freq=2)
FreqItemset(items=['s', 'z'], freq=2)
FreqItemset(items=['x'], freq=4)
FreqItemset(items=['x', 'z'], freq=3)
FreqItemset(items=['t'], freq=3)
FreqItemset(items=['t', 'x'], freq=3)
FreqItemset(items=['t', 'x',

In [55]:
model = FPGrowth.train(transactions, minSupport=0.4, numPartitions=4)
result = model.freqItemsets().collect()
for fi in result:
    print(fi)

FreqItemset(items=['r'], freq=3)
FreqItemset(items=['z'], freq=5)
FreqItemset(items=['s'], freq=3)
FreqItemset(items=['s', 'x'], freq=3)
FreqItemset(items=['x'], freq=4)
FreqItemset(items=['x', 'z'], freq=3)
FreqItemset(items=['t'], freq=3)
FreqItemset(items=['t', 'x'], freq=3)
FreqItemset(items=['t', 'x', 'z'], freq=3)
FreqItemset(items=['t', 'z'], freq=3)
FreqItemset(items=['y'], freq=3)
FreqItemset(items=['y', 't'], freq=3)
FreqItemset(items=['y', 't', 'x'], freq=3)
FreqItemset(items=['y', 't', 'x', 'z'], freq=3)
FreqItemset(items=['y', 't', 'z'], freq=3)
FreqItemset(items=['y', 'x'], freq=3)
FreqItemset(items=['y', 'x', 'z'], freq=3)
FreqItemset(items=['y', 'z'], freq=3)


In [56]:
model = FPGrowth.train(transactions, minSupport=0.8, numPartitions=4)
result = model.freqItemsets().collect()
for fi in result:
    print(fi)

FreqItemset(items=['z'], freq=5)


In [59]:
data = sc.textFile("/tmp/sample_fpgrowth_with_duplicates.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=4)
result = model.freqItemsets().collect()
for fi in result:
    print(fi)

Py4JJavaError: An error occurred while calling o916.trainFPGrowthModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 105.0 failed 1 times, most recent failure: Lost task 0.0 in stage 105.0 (TID 557, localhost, executor driver): org.apache.spark.SparkException: Items in a transaction must be unique but got WrappedArray(r, z, p, h, k, p).
	at org.apache.spark.mllib.fpm.FPGrowth$$anonfun$7.apply(FPGrowth.scala:251)
	at org.apache.spark.mllib.fpm.FPGrowth$$anonfun$7.apply(FPGrowth.scala:248)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.mllib.fpm.FPGrowth.genFreqItems(FPGrowth.scala:257)
	at org.apache.spark.mllib.fpm.FPGrowth.run(FPGrowth.scala:221)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainFPGrowthModel(PythonMLLibAPI.scala:576)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Items in a transaction must be unique but got WrappedArray(r, z, p, h, k, p).
	at org.apache.spark.mllib.fpm.FPGrowth$$anonfun$7.apply(FPGrowth.scala:251)
	at org.apache.spark.mllib.fpm.FPGrowth$$anonfun$7.apply(FPGrowth.scala:248)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [60]:
transactions = data.map(lambda line: set(line.strip().split(' ')))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=4)
result = model.freqItemsets().collect()
for fi in result:
    print(fi)

FreqItemset(items=['r'], freq=3)
FreqItemset(items=['r', 'x'], freq=2)
FreqItemset(items=['r', 'z'], freq=2)
FreqItemset(items=['z'], freq=5)
FreqItemset(items=['s'], freq=3)
FreqItemset(items=['s', 't'], freq=2)
FreqItemset(items=['s', 't', 'x'], freq=2)
FreqItemset(items=['s', 't', 'x', 'z'], freq=2)
FreqItemset(items=['s', 't', 'z'], freq=2)
FreqItemset(items=['s', 'x'], freq=3)
FreqItemset(items=['s', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'y'], freq=2)
FreqItemset(items=['s', 'y', 't'], freq=2)
FreqItemset(items=['s', 'y', 't', 'x'], freq=2)
FreqItemset(items=['s', 'y', 't', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'y', 't', 'z'], freq=2)
FreqItemset(items=['s', 'y', 'x'], freq=2)
FreqItemset(items=['s', 'y', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'y', 'z'], freq=2)
FreqItemset(items=['s', 'z'], freq=2)
FreqItemset(items=['x'], freq=4)
FreqItemset(items=['x', 'z'], freq=3)
FreqItemset(items=['t'], freq=3)
FreqItemset(items=['t', 'x'], freq=3)
FreqItemset(items=['t', 'x',

# K-means

In [66]:
from numpy import array
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.mllib.linalg import Vectors

# Loads data.
data = sc.textFile("/tmp/sample_kmeans.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Cluster the data into two classes using KMeans
numClusters = 2
numIterations = 20
model = KMeans.train(parsedData, numClusters, maxIterations=numIterations, initializationMode="random")

## Predict the cluster index that a given point belongs to according to the model

In [67]:
model.predict(Vectors.dense(0.5, 9.3, 2.4))

1

In [68]:
model.predict(Vectors.dense(9.5, 9.3, 2.4))

0