In [1]:
from pyspark import SparkConf, SparkContext

In [2]:
Conf = SparkConf().setMaster("local[*]").setAppName("MyApp")
sc = SparkContext.getOrCreate(conf=Conf)

In [3]:
# Load the data 
raw_content = sc.textFile("2015-12-12.csv")

# Print the type of the object
print(type(raw_content))

# Print the number of lines
print(raw_content.count())

<class 'pyspark.rdd.RDD'>
421970


In [4]:
# Print first five elements
print(raw_content.take(5))

['"date","time","size","r_version","r_arch","r_os","package","version","country","ip_id"', '"2015-12-12","13:42:10",257886,"3.2.2","i386","mingw32","HistData","0.7-6","CZ",1', '"2015-12-12","13:24:37",1236751,"3.2.2","x86_64","mingw32","RJSONIO","1.3-0","DE",2', '"2015-12-12","13:42:35",2077876,"3.2.2","i386","mingw32","UsingR","2.0-5","CZ",1', '"2015-12-12","13:42:01",266724,"3.2.2","i386","mingw32","gridExtra","2.0.0","CZ",1']


Split or replace operation by map

In [5]:
content = raw_content.map(lambda x: x.split(","))
content.take(3)

[['"date"',
  '"time"',
  '"size"',
  '"r_version"',
  '"r_arch"',
  '"r_os"',
  '"package"',
  '"version"',
  '"country"',
  '"ip_id"'],
 ['"2015-12-12"',
  '"13:42:10"',
  '257886',
  '"3.2.2"',
  '"i386"',
  '"mingw32"',
  '"HistData"',
  '"0.7-6"',
  '"CZ"',
  '1'],
 ['"2015-12-12"',
  '"13:24:37"',
  '1236751',
  '"3.2.2"',
  '"x86_64"',
  '"mingw32"',
  '"RJSONIO"',
  '"1.3-0"',
  '"DE"',
  '2']]

In [6]:
def clean(x):
    return ([xx.replace('"','') for xx in x])

In [7]:
content = content.map(clean)
content.take(3)

[['date',
  'time',
  'size',
  'r_version',
  'r_arch',
  'r_os',
  'package',
  'version',
  'country',
  'ip_id'],
 ['2015-12-12',
  '13:42:10',
  '257886',
  '3.2.2',
  'i386',
  'mingw32',
  'HistData',
  '0.7-6',
  'CZ',
  '1'],
 ['2015-12-12',
  '13:24:37',
  '1236751',
  '3.2.2',
  'x86_64',
  'mingw32',
  'RJSONIO',
  '1.3-0',
  'DE',
  '2']]

In [8]:
tmp_content = raw_content.map(lambda x: x.split(",")).map(clean)
tmp_content.take(3)

[['date',
  'time',
  'size',
  'r_version',
  'r_arch',
  'r_os',
  'package',
  'version',
  'country',
  'ip_id'],
 ['2015-12-12',
  '13:42:10',
  '257886',
  '3.2.2',
  'i386',
  'mingw32',
  'HistData',
  '0.7-6',
  'CZ',
  '1'],
 ['2015-12-12',
  '13:24:37',
  '1236751',
  '3.2.2',
  'x86_64',
  'mingw32',
  'RJSONIO',
  '1.3-0',
  'DE',
  '2']]

In [9]:
text = ['a b c','d e','f g h']
tmp1 = sc.parallelize(text).map(lambda x: x.split(" ")).collect()
print(tmp1)

[['a', 'b', 'c'], ['d', 'e'], ['f', 'g', 'h']]


In [10]:
tmp2 = sc.parallelize(text).flatMap(lambda x: x.split(" ")).collect()
print(tmp2)

['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']


In [11]:
package_count = content.map(lambda x: (x[6],1)).reduceByKey(lambda a,b: a+b)
package_count.count()

8660

In [12]:
package_count.take(5)

[('HistData', 159),
 ('UsingR', 151),
 ('lme4', 1560),
 ('testthat', 1178),
 ('maps', 1586)]

In [13]:
package_count_2 = content.map(lambda x: (x[6],1)).countByKey()
print(type(package_count_2))

<class 'collections.defaultdict'>


In [14]:
package_count_2['ggplot2']

3913

In [15]:
package_count_2['stm']

25

In [17]:
package_count.map(lambda x: (x[1],x[0])).sortByKey(0).take(10)

[(4783, 'Rcpp'),
 (3913, 'ggplot2'),
 (3748, 'stringi'),
 (3449, 'stringr'),
 (3436, 'plyr'),
 (3265, 'magrittr'),
 (3223, 'digest'),
 (3205, 'reshape2'),
 (3046, 'RColorBrewer'),
 (3007, 'scales')]

In [18]:
content.filter(lambda x: x[6] == 'Rtts' and x[8] == 'CN').count()

1

In [19]:
content.filter(lambda x: x[6] == 'Rtts' and x[8] == 'CN').take(1)

[['2015-12-12',
  '20:15:24',
  '23820',
  '3.2.2',
  'x86_64',
  'mingw32',
  'Rtts',
  '0.3.3',
  'CN',
  '41']]

In [20]:
raw_content.count()

421970

In [21]:
raw_content.union(raw_content).count()

843940

In [22]:
raw_content.distinct().count()

421553

In [24]:
raw_content.intersection(raw_content).count()

421553

In [25]:
content_modified = content.map(lambda x: (x[8],x))

In [26]:
mapping = [('DE','Germany'),('US','United States'),('CN','China'),('IN','India')]
mapping = sc.parallelize(mapping)

In [27]:
content_modified.join(mapping).takeSample(False,8)

[('CN',
  (['2015-12-12',
    '03:48:48',
    '14271402',
    '3.2.3',
    'x86_64',
    'mingw32',
    'stringi',
    '1.0-1',
    'CN',
    '533'],
   'China')),
 ('US',
  (['2015-12-12',
    '08:57:26',
    '3595507',
    '3.2.2',
    'x86_64',
    'mingw32',
    'maps',
    '3.0.1',
    'US',
    '990'],
   'United States')),
 ('CN',
  (['2015-12-12',
    '21:02:13',
    '514',
    'NA',
    'NA',
    'NA',
    'MethComp',
    '1.3',
    'CN',
    '1809'],
   'China')),
 ('US',
  (['2015-12-12',
    '05:10:49',
    '917417',
    '3.2.3',
    'x86_64',
    'mingw32',
    'RCurl',
    '1.95-4.7',
    'US',
    '5716'],
   'United States')),
 ('US',
  (['2015-12-12',
    '03:05:22',
    '144913',
    '3.2.3',
    'x86_64',
    'linux-gnu',
    'R6',
    '2.1.1',
    'US',
    '7761'],
   'United States')),
 ('CN',
  (['2015-12-12',
    '18:15:40',
    '14616',
    'NA',
    'NA',
    'NA',
    'xlsxjars',
    '0.6.1',
    'CN',
    '483'],
   'China')),
 ('CN',
  (['2015-12-12',
    '