# Spark Core Demo

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('My First Spark application') \
    .getOrCreate() 

sc = spark.sparkContext

# RDDs (Week 2)

In [2]:
lines = sc.textFile('../non_auto_assignments/data/misc/blurb.txt')

In [3]:
data_lines = lines.filter(lambda x: 'data' in x)

In [4]:
data_lines.take(10)

['The University of Michigan School of Information is pleased to offer a new, fully online, master’s program in applied data science. We teach comprehensive applied data science at the intersection of people and technology. ',
 'We provide critical insight into data collection, computation, and analytics, and help develop hands-on skills using a multidisciplinary approach embedded in information, computer science, and statistics. ',
 'Coursework and projects focus on applying data science to real-world problems. ']

In [5]:
data_lines.first()

'The University of Michigan School of Information is pleased to offer a new, fully online, master’s program in applied data science. We teach comprehensive applied data science at the intersection of people and technology. '

In [6]:
data_lines.count()

3

In [7]:
data_lines.collect()

['The University of Michigan School of Information is pleased to offer a new, fully online, master’s program in applied data science. We teach comprehensive applied data science at the intersection of people and technology. ',
 'We provide critical insight into data collection, computation, and analytics, and help develop hands-on skills using a multidisciplinary approach embedded in information, computer science, and statistics. ',
 'Coursework and projects focus on applying data science to real-world problems. ']

In [36]:
%%time
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print('%i ' % (num))


1 
4 
9 
16 
Wall time: 7.77 s


In [9]:
import multiprocessing
import psutil
print("processor count: " + str(multiprocessing.cpu_count()))
psutil.virtual_memory()

processor count: 20


svmem(total=68463136768, available=55353417728, percent=19.1, used=13109719040, free=55353417728)

In [10]:
#Sometimes we want to produce multiple output elements for each input element.
#The operation to do this is called flatMap(). As with map(), the function we
#provide to flatMap() is called individually for each element in our input RDD.
#Instead of returning a single element, we return an iterator with our return 
#values. Rather than producing an RDD of iterators, we get back an RDD that
#consists of the elements from all of the iterators.

lines = sc.parallelize(["hello world", "hi", "hi world", "hello everyone", "hello"])
words = lines.flatMap(lambda line: line.split(" "))
print(words.collect()) 

word_map = lines.map(lambda line:line.split(' '))
print(word_map.collect()) 

word_counts = words.map(lambda word: (word,1))
print(word_counts.collect())

['hello', 'world', 'hi', 'hi', 'world', 'hello', 'everyone', 'hello']
[['hello', 'world'], ['hi'], ['hi', 'world'], ['hello', 'everyone'], ['hello']]
[('hello', 1), ('world', 1), ('hi', 1), ('hi', 1), ('world', 1), ('hello', 1), ('everyone', 1), ('hello', 1)]


In [11]:
wc = word_counts.reduceByKey(lambda accumulator,value: accumulator + value)

In [12]:
wc.take(10)

[('hello', 3), ('world', 2), ('hi', 2), ('everyone', 1)]

In [13]:
wc.sortByKey(lambda x,y: x+y).collect()

[('everyone', 1), ('hello', 3), ('hi', 2), ('world', 2)]

In [14]:
wc.sortBy(lambda x: x[1], ascending=False).take(10)

[('hello', 3), ('world', 2), ('hi', 2), ('everyone', 1)]

In [16]:
business = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_business.json.gz')
checkin = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_checkin.json.gz')
# review = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_review.json.gz')
tip = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_tip.json.gz')
# user = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_user.json.gz')

In [None]:
review.printSchema()

In [None]:
result = user.filter(user['compliment_cool'] > 5000).collect()

In [17]:
checkin.first()

Row(business_id='--1UhMGODdWsrMastO9DZw', date='2016-04-26 19:49:16, 2016-08-30 18:36:57, 2016-10-15 02:45:18, 2016-11-18 01:54:50, 2017-04-20 18:39:06, 2017-05-03 17:58:02')

In [18]:
checkin.dtypes

[('business_id', 'string'), ('date', 'string')]

In [19]:
checkin.groupby(['weekday','hour'])

<pyspark.sql.group.GroupedData at 0x2ebf1025348>

In [20]:
from pyspark.sql.functions import udf

In [21]:
from pyspark.sql.types import IntegerType
square_udf_int = udf(lambda z: square(z), IntegerType())

In [22]:
from pyspark.sql.types import ArrayType,StringType
datesplit = udf(lambda x: x.split(','),ArrayType(StringType()))

In [23]:
(checkin.select('business_id',datesplit('date').alias('dates'))).first()

Row(business_id='--1UhMGODdWsrMastO9DZw', dates=['2016-04-26 19:49:16', ' 2016-08-30 18:36:57', ' 2016-10-15 02:45:18', ' 2016-11-18 01:54:50', ' 2017-04-20 18:39:06', ' 2017-05-03 17:58:02'])

In [24]:
from pyspark.sql.functions import explode

In [25]:
checkin.select('business_id',datesplit('date').alias('dates')).withColumn('checkin_date',explode('dates')).take(10)

[Row(business_id='--1UhMGODdWsrMastO9DZw', dates=['2016-04-26 19:49:16', ' 2016-08-30 18:36:57', ' 2016-10-15 02:45:18', ' 2016-11-18 01:54:50', ' 2017-04-20 18:39:06', ' 2017-05-03 17:58:02'], checkin_date='2016-04-26 19:49:16'),
 Row(business_id='--1UhMGODdWsrMastO9DZw', dates=['2016-04-26 19:49:16', ' 2016-08-30 18:36:57', ' 2016-10-15 02:45:18', ' 2016-11-18 01:54:50', ' 2017-04-20 18:39:06', ' 2017-05-03 17:58:02'], checkin_date=' 2016-08-30 18:36:57'),
 Row(business_id='--1UhMGODdWsrMastO9DZw', dates=['2016-04-26 19:49:16', ' 2016-08-30 18:36:57', ' 2016-10-15 02:45:18', ' 2016-11-18 01:54:50', ' 2017-04-20 18:39:06', ' 2017-05-03 17:58:02'], checkin_date=' 2016-10-15 02:45:18'),
 Row(business_id='--1UhMGODdWsrMastO9DZw', dates=['2016-04-26 19:49:16', ' 2016-08-30 18:36:57', ' 2016-10-15 02:45:18', ' 2016-11-18 01:54:50', ' 2017-04-20 18:39:06', ' 2017-05-03 17:58:02'], checkin_date=' 2016-11-18 01:54:50'),
 Row(business_id='--1UhMGODdWsrMastO9DZw', dates=['2016-04-26 19:49:16', 

In [26]:
d1.dtypes

NameError: name 'd1' is not defined

In [27]:
result[0].asDict()

NameError: name 'result' is not defined

In [None]:
review.printSchema()

In [28]:
tip.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- compliment_count: long (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)



In [29]:
checkin.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)



In [30]:
business.columns

['address',
 'attributes',
 'business_id',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'name',
 'postal_code',
 'review_count',
 'stars',
 'state']

In [31]:
business.dtypes

[('address', 'string'),
 ('attributes',
  'struct<AcceptsInsurance:string,AgesAllowed:string,Alcohol:string,Ambience:string,BYOB:string,BYOBCorkage:string,BestNights:string,BikeParking:string,BusinessAcceptsBitcoin:string,BusinessAcceptsCreditCards:string,BusinessParking:string,ByAppointmentOnly:string,Caters:string,CoatCheck:string,Corkage:string,DietaryRestrictions:string,DogsAllowed:string,DriveThru:string,GoodForDancing:string,GoodForKids:string,GoodForMeal:string,HairSpecializesIn:string,HappyHour:string,HasTV:string,Music:string,NoiseLevel:string,Open24Hours:string,OutdoorSeating:string,RestaurantsAttire:string,RestaurantsCounterService:string,RestaurantsDelivery:string,RestaurantsGoodForGroups:string,RestaurantsPriceRange2:string,RestaurantsReservations:string,RestaurantsTableService:string,RestaurantsTakeOut:string,Smoking:string,WheelchairAccessible:string,WiFi:string>'),
 ('business_id', 'string'),
 ('categories', 'string'),
 ('city', 'string'),
 ('hours',
  'struct<Friday:st

In [34]:
business.describe().show()

+-------+-------------------+--------------------+--------------------+---------+-------------------+-----------------+------------------+--------------------+------------------+------------------+------------------+------+
|summary|            address|         business_id|          categories|     city|            is_open|         latitude|         longitude|                name|       postal_code|      review_count|             stars| state|
+-------+-------------------+--------------------+--------------------+---------+-------------------+-----------------+------------------+--------------------+------------------+------------------+------------------+------+
|  count|             192609|              192609|              192127|   192609|             192609|           192609|            192609|              192609|            192609|            192609|            192609|192609|
|   mean|               null|                null|                null|     null| 0.8230404602069478|38.

In [35]:
business.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str