#### ** (1) Open and inspect the file **
#### We open hadoop_1m.txt file in which we know from earlier work on MapReduce
#### We also inspect its contents

In [3]:
import os.path
baseDir = os.path.join('data')
inputPath = os.path.join('hadoop_1m.txt')
fileName = os.path.join(baseDir, inputPath)

hadoop_rdd = (sc
              .textFile(fileName))

print hadoop_rdd.count()
print hadoop_rdd.take(5)

1000000
[u'From general-return-2133-apmail-hadoop-general-archive=hadoop.apache.org@hadoop.apache.org Fri Oct 01 15:07:28 2010', u'Return-Path: <general-return-2133-apmail-hadoop-general-archive=hadoop.apache.org@hadoop.apache.org>', u'Delivered-To: apmail-hadoop-general-archive@minotaur.apache.org', u'Received: (qmail 98655 invoked from network); 1 Oct 2010 15:07:28 -0000', u'Received: from unknown (HELO mail.apache.org) (140.211.11.3)']


#### ** (2) Count email domains **
#### Compare to Hadoop MapReduce code from earlier lectures

In [20]:
domain_counts = (hadoop_rdd
                 .filter(lambda line: line.find("From:") == 0)
                 .map(lambda line: (line[line.find("@")+1:line.find(">")],1))
                 .reduceByKey(lambda a, b: a+b))

print domain_counts.count()
print domain_counts.take(5)

269
[(u'nokia.com', 2), (u'sabalcore.com', 3), (u'cs.ucf.ed', 6), (u'cloudera.com', 442), (u'schiessle.org', 5)]


#### Comapre reduceByKey to reduce
#### domain_counts above finishes with reduce, we see below that the result of such operation is an RDD, so it's a transformation
#### If we run reduce on domain_counts it becomes just a number which is returned to the driver, so it's an action, like take or count above

In [21]:
print domain_counts

print domain_counts.map(lambda (a, b): b).reduce(lambda a, b: a+b)

PythonRDD[63] at RDD at PythonRDD.scala:43
11424


#### (3) Spark SQL
####Load library and data

In [81]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
baseDir = os.path.join('data')
inputPath = os.path.join('hadoop_1m.csv')
fileName = os.path.join(baseDir, inputPath)

hadoop_csv_rdd = (sc
                  .textFile(fileName))

emails = (hadoop_csv_rdd
         .map(lambda l: l.split(","))
         .map(lambda p: Row(id=p[0], list=p[1], date1=p[2], date2=p[3], email=p[4], subject=p[5])))

schemaEmails = sqlContext.createDataFrame(emails)
schemaEmails.registerTempTable("emails")

In [82]:
emails.take(5)

[Row(date1=u'"Fri', date2=u' 1 Oct 2010 12:06:54 -0300"', email=u'marcoscba@gmail.com', id=u'AANLkTin6ZUrWYUscAXmq0bOG8mWA35xcAfvLD-jYVzwu@mail.gmail.com', list=u' general.hadoop.apache.org', subject=u'"Is it possible write directly to datanode\'s?"'),
 Row(date1=u'"Fri', date2=u' 1 Oct 2010 21:27:22 -0700"', email=u'rmalviya@apple.com', id=u'1C4C43FE-05F2-464B-9F04-AC972988C326@apple.com', list=u' general.hadoop.apache.org', subject=u'"Total Space Available on Hadoop Cluster Or Hadoop version of "df"."'),
 Row(date1=u'"Sat', date2=u' 2 Oct 2010 16:13:18 +1000"', email=u'Glenn.Gore@melbourneit.com.au', id=u'C586A8404F8B304E807B4B8BF73B376EA600AC@WIC001MITEBCLV1.messaging.mit', list=u' general.hadoop.apache.org', subject=u'"RE: Total Space Available on Hadoop Cluster Or Hadoop version of "df"."'),
 Row(date1=u'"Sat', date2=u' 2 Oct 2010 08:12:48 -0400"', email=u'palmercliff@gmail.com', id=u'AANLkTimtEM4yzQSC5F16j4r0eqd0MeDb+7FF3FT1v3Yk@mail.gmail.com', list=u' general.hadoop.apache.org'

In [107]:
lists = sqlContext.sql("SELECT COUNT(DISTINCT(email)) FROM emails")
lists.collect()

[Row(c0=627)]

#### In case of problems check if every line has enough fields
#### Spark SQL is more sensitive to missing data than Hive

In [86]:
(sc.textFile("./data/hadoop_1m.csv")
    .map(lambda line: line.split(","))
    .filter(lambda line: len(line)<6)
    .collect())

[]

#### Original query from 101 should work from Spark 1.5

In [None]:
results = sqlContext.sql("SELECT substr(email, locate('@', email)+1), count(substr(email,locate('@', email)+1)) FROM emails GROUP BY substr(email,locate('@', email)+1)")
results.collect()