# Spark Core Demo

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('My First Spark application') \
    .getOrCreate() 

# local[*] means the master node is going to use all of the local processors that are avaialbe to us. 
# The appName is a completely optional parameter but it's useful in case you find youself running
# multiple applications at the same time on the same Spark cluster.

sc = spark.sparkContext

# SparkSession manages the distribution of computing and storage resources across a cluster of computers.

# RDDs (Week 2)

In [2]:
lines = sc.textFile('../non_auto_assignments/data/misc/blurb.txt')

In [3]:
data_lines = lines.filter(lambda x: 'data' in x)

In [4]:
data_lines.take(10)

['The University of Michigan School of Information is pleased to offer a new, fully online, master’s program in applied data science. We teach comprehensive applied data science at the intersection of people and technology. ',
 'We provide critical insight into data collection, computation, and analytics, and help develop hands-on skills using a multidisciplinary approach embedded in information, computer science, and statistics. ',
 'Coursework and projects focus on applying data science to real-world problems. ']

In [5]:
data_lines.first()

'The University of Michigan School of Information is pleased to offer a new, fully online, master’s program in applied data science. We teach comprehensive applied data science at the intersection of people and technology. '

In [6]:
def data_filter(s):
    return 'data' in s

data_lines = lines.filter(data_filter)

In [7]:
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print('%i ' % (num))

1 
4 
9 
16 


In [9]:
#Sometimes we want to produce multiple output elements for each input element. The operation to do this is called flatMap(). As with map(), the function we provide to flatMap() is called individually for each element in our input RDD. Instead of returning a single element, we return an iterator with our return values. Rather than producing an RDD of iterators, we get back an RDD that consists of the elements from all of the iterators.

lines = sc.parallelize(["hello world", "hi", "hi world", "hello everyone", "hello"])
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word,1))
print(words.collect()) 
print(word_counts.collect())

['hello', 'world', 'hi', 'hi', 'world', 'hello', 'everyone', 'hello']
[('hello', 1), ('world', 1), ('hi', 1), ('hi', 1), ('world', 1), ('hello', 1), ('everyone', 1), ('hello', 1)]


In [10]:
wc = word_counts.reduceByKey(lambda accumulator,value: accumulator + value)

In [11]:
wc.take(10)

[('world', 2), ('hello', 3), ('hi', 2), ('everyone', 1)]

In [12]:
wc.sortByKey(lambda x,y: x+y).collect()

[('everyone', 1), ('hello', 3), ('hi', 2), ('world', 2)]

In [13]:
wc.sortBy(lambda x: x[1], ascending=False).take(10)

[('hello', 3), ('world', 2), ('hi', 2), ('everyone', 1)]

In [16]:
input_file = sc.textFile("data/short.t1.txt")

In [17]:
word_count1 = input_file.flatMap(lambda line: line.split())
word_count2 = word_count1.map(lambda word: (word,1))
word_count3 = word_count2.reduceByKey(lambda a, b: a+b)

word_counts_sorted = word_count3.sortBy(lambda x: x[1], ascending=False)

word_counts_sorted.take(10)

[('the', 128),
 ('of', 84),
 ('to', 40),
 ('is', 33),
 ('and', 30),
 ('in', 28),
 ('a', 27),
 ('that', 19),
 ('for', 18),
 ('was', 17)]

In [None]:
business = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_business.json.gz')
checkin = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_checkin.json.gz')
review = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_review.json.gz')
tip = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_tip.json.gz')
user = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_user.json.gz')

In [None]:
review.printSchema()

In [None]:
result = user.filter(user['compliment_cool'] > 5000).collect()

In [None]:
checkin.first()

In [None]:
checkin.dtypes

In [None]:
checkin.groupby(['weekday','hour'])

In [None]:
from pyspark.sql.functions import udf

In [None]:
from pyspark.sql.types import IntegerType
square_udf_int = udf(lambda z: square(z), IntegerType())

In [None]:
from pyspark.sql.types import ArrayType,StringType
datesplit = udf(lambda x: x.split(','),ArrayType(StringType()))

In [None]:
(checkin.select('business_id',datesplit('date').alias('dates'))).first()

In [None]:
from pyspark.sql.functions import explode

In [None]:
checkin.select('business_id',datesplit('date').alias('dates')).withColumn('checkin_date',explode('dates')).take(10)

In [None]:
d1.dtypes

In [None]:
result[0].asDict()

In [None]:
review.printSchema()

In [None]:
tip.printSchema()

In [None]:
checkin.printSchema()

In [None]:
business.columns

In [None]:
business.dtypes

In [None]:
businesses.describe().show()

In [None]:
businesses.printSchema()