In [1]:
import pyspark
sc = pyspark.SparkContext('spark://spark-master:7077')

In [2]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [3]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('./data/country-list.csv')

In [4]:
counts = country_lines.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)

In [5]:
counts.count()

455

In [6]:
country_lines.count()

211

In [7]:
country_lines.collect()

['Afghanistan, AFG',
 'Albania, ALB',
 'Algeria, ALG',
 'American Samoa, ASA',
 'Andorra, AND',
 'Angola, ANG',
 'Anguilla, AIA',
 'Antigua and Barbuda, ATG',
 'Argentina, ARG',
 'Armenia, ARM',
 'Aruba, ARU',
 'Australia, AUS',
 'Austria, AUT',
 'Azerbaijan, AZE',
 'Bahamas, BAH',
 'Bahrain, BHR',
 'Bangladesh, BAN',
 'Barbados, BRB',
 'Belarus, BLR',
 'Belgium, BEL',
 'Belize, BLZ',
 'Benin, BEN',
 'Bermuda, BER',
 'Bhutan, BHU',
 'Bolivia, BOL',
 'Bosnia and Herzegovina, BIH',
 'Botswana, BOT',
 'Brazil, BRA',
 'British Virgin Islands, VGB',
 'Brunei, BRU',
 'Bulgaria, BUL',
 'Burkina Faso, BFA',
 'Burundi, BDI',
 'Cambodia, CAM',
 'Cameroon, CMR',
 'Canada, CAN',
 'Cape Verde, CPV',
 'Cayman Islands, CAY',
 'Central African Republic, CTA',
 'Chad, CHA',
 'Chile, CHI',
 'China PR, CHN',
 'Chinese Taipei, TPE',
 'Colombia, COL',
 'Comoros, COM',
 'Congo, CGO',
 'DR Congo, COD',
 'Cook Islands, COK',
 'Costa Rica, CRC',
 'Croatia, CRO',
 'Cuba, CUB',
 'Curaçao, CUW',
 'Cyprus, CYP',
 

In [8]:
# Convert each line into a pair of words
country_lines = country_lines.map(lambda line: line.split(", "))

In [9]:
country_lines.collect()

[['Afghanistan', 'AFG'],
 ['Albania', 'ALB'],
 ['Algeria', 'ALG'],
 ['American Samoa', 'ASA'],
 ['Andorra', 'AND'],
 ['Angola', 'ANG'],
 ['Anguilla', 'AIA'],
 ['Antigua and Barbuda', 'ATG'],
 ['Argentina', 'ARG'],
 ['Armenia', 'ARM'],
 ['Aruba', 'ARU'],
 ['Australia', 'AUS'],
 ['Austria', 'AUT'],
 ['Azerbaijan', 'AZE'],
 ['Bahamas', 'BAH'],
 ['Bahrain', 'BHR'],
 ['Bangladesh', 'BAN'],
 ['Barbados', 'BRB'],
 ['Belarus', 'BLR'],
 ['Belgium', 'BEL'],
 ['Belize', 'BLZ'],
 ['Benin', 'BEN'],
 ['Bermuda', 'BER'],
 ['Bhutan', 'BHU'],
 ['Bolivia', 'BOL'],
 ['Bosnia and Herzegovina', 'BIH'],
 ['Botswana', 'BOT'],
 ['Brazil', 'BRA'],
 ['British Virgin Islands', 'VGB'],
 ['Brunei', 'BRU'],
 ['Bulgaria', 'BUL'],
 ['Burkina Faso', 'BFA'],
 ['Burundi', 'BDI'],
 ['Cambodia', 'CAM'],
 ['Cameroon', 'CMR'],
 ['Canada', 'CAN'],
 ['Cape Verde', 'CPV'],
 ['Cayman Islands', 'CAY'],
 ['Central African Republic', 'CTA'],
 ['Chad', 'CHA'],
 ['Chile', 'CHI'],
 ['China PR', 'CHN'],
 ['Chinese Taipei', 'TPE'],
 ['

In [10]:
# Convert each pair of words into a tuple
country_tuples = country_lines.map(lambda line: (line[0], line[1]))

In [11]:
country_tuples.collect()

[('Afghanistan', 'AFG'),
 ('Albania', 'ALB'),
 ('Algeria', 'ALG'),
 ('American Samoa', 'ASA'),
 ('Andorra', 'AND'),
 ('Angola', 'ANG'),
 ('Anguilla', 'AIA'),
 ('Antigua and Barbuda', 'ATG'),
 ('Argentina', 'ARG'),
 ('Armenia', 'ARM'),
 ('Aruba', 'ARU'),
 ('Australia', 'AUS'),
 ('Austria', 'AUT'),
 ('Azerbaijan', 'AZE'),
 ('Bahamas', 'BAH'),
 ('Bahrain', 'BHR'),
 ('Bangladesh', 'BAN'),
 ('Barbados', 'BRB'),
 ('Belarus', 'BLR'),
 ('Belgium', 'BEL'),
 ('Belize', 'BLZ'),
 ('Benin', 'BEN'),
 ('Bermuda', 'BER'),
 ('Bhutan', 'BHU'),
 ('Bolivia', 'BOL'),
 ('Bosnia and Herzegovina', 'BIH'),
 ('Botswana', 'BOT'),
 ('Brazil', 'BRA'),
 ('British Virgin Islands', 'VGB'),
 ('Brunei', 'BRU'),
 ('Bulgaria', 'BUL'),
 ('Burkina Faso', 'BFA'),
 ('Burundi', 'BDI'),
 ('Cambodia', 'CAM'),
 ('Cameroon', 'CMR'),
 ('Canada', 'CAN'),
 ('Cape Verde', 'CPV'),
 ('Cayman Islands', 'CAY'),
 ('Central African Republic', 'CTA'),
 ('Chad', 'CHA'),
 ('Chile', 'CHI'),
 ('China PR', 'CHN'),
 ('Chinese Taipei', 'TPE'),
 ('

In [12]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

In [14]:
# Read tweets CSV file into RDD of lines
tweets_lines = sqlContext.read.json('./data/tweets.json')

In [15]:
tweets_lines.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- source: string (nullable = true)
 |-- tweet_ID: string (nullable = true)
 |-- tweet_followers_count: long (nullable = true)
 |-- tweet_mentioned_count: long (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- CreatedAt: struct (nullable = true)
 |    |    |-- $date: string (nullable = true)
 |    |-- FavouritesCount: long (nullable = true)
 |    |-- FollowersCount: long (nullable = true)
 |    |-- FriendsCount: long (nullable = true)
 |    |-- Location: string (nullable = true)
 |    |-- UserId: long (nullable = true)
 |-- user_name: string (nullable = true)



In [16]:
tweets_lines.registerTempTable("tweets")