In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import Row, SparkSession, SQLContext
from pyspark.streaming import StreamingContext

app_name = "prac"

conf = SparkConf().setAppName(app_name).setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
spark = SparkSession.builder.appName(app_name).getOrCreate()

22/05/01 16:51:47 WARN Utils: Your hostname, jl-hp resolves to a loopback address: 127.0.1.1; using 192.168.31.13 instead (on interface wlo1)
22/05/01 16:51:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/01 16:51:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from random import random
from operator import add

partitions = 1
n = 10000000 * partitions

def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count/n))

[Stage 0:>                                                          (0 + 1) / 1]

Pi is roughly 3.141162


                                                                                

In [3]:
from datetime import datetime, date

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
print(df)

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]


In [4]:
# Sample Quiz Q1: read from text file
rdd = sc.textFile('data.txt').map(lambda line: tuple(line.split()))
cols = ['Last Name', 'First Name', 'Age', 'Gender', 'State of Residence', 'Education level']
df2 = rdd.toDF(cols)
df2.printSchema()
df2.show(truncate=False)

df2.createOrReplaceTempView("data")
query = '''
SELECT * FROM data
WHERE `State of Residence` = 'ny' AND Age >= 21
'''
sqlDF = spark.sql(query)
sqlDF.show()

root
 |-- Last Name: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- State of Residence: string (nullable = true)
 |-- Education level: string (nullable = true)

+---------+----------+---+------+------------------+---------------+
|Last Name|First Name|Age|Gender|State of Residence|Education level|
+---------+----------+---+------+------------------+---------------+
|john     |john      |19 |male  |ny                |bachelor       |
|amy      |amy       |23 |female|ny                |master         |
|bob      |bob       |16 |male  |ca                |highSchool     |
+---------+----------+---+------+------------------+---------------+

+---------+----------+---+------+------------------+---------------+
|Last Name|First Name|Age|Gender|State of Residence|Education level|
+---------+----------+---+------+------------------+---------------+
|      amy|       amy| 23|female|                n

In [5]:
# Sample Quiz Q2: PageRank
import re
from typing import Iterable, Tuple


def computeContribs(urls: Iterable[str], rank: float) -> Iterable[Tuple[str, float]]:
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

def parseNeighbors(urls: str) -> Tuple[str, str]:
    """Parses a urls pair string into urls pair."""
    parts = re.split(r'\s+', urls)
    return parts[0], parts[1]

iterations = 10

lines = spark.read.text('link.txt').rdd.map(lambda r: r[0])
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))

for iteration in range(iterations):
    # Calculates URL contributions to the rank of other URLs.
    contribs = links.join(ranks).flatMap(lambda url_urls_rank: computeContribs(
        url_urls_rank[1][0], url_urls_rank[1][1]  # type: ignore[arg-type]
    ))
    # Re-calculates URL ranks based on neighbor contributions.
    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)

# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
    print("%s has rank: %s." % (link, rank))

c.net has rank: 0.7464008076748408.
d.co has rank: 0.7844406865236148.
a.com has rank: 0.838990182786046.
b.com has rank: 0.4001096809515733.
e.co has rank: 0.4001096809515733.
bit.ly has rank: 0.8105766856306423.


In [6]:
# Sample Quiz Q3: Spark Streams
# Reference: https://www.toptal.com/apache/apache-spark-streaming-twitter
'''
# Code of twitter stream (producer, send side)
import socket
import sys
import requests
import requests_oauthlib
import json

ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN'
ACCESS_SECRET = 'YOUR_ACCESS_SECRET'
CONSUMER_KEY = 'YOUR_CONSUMER_KEY'
CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET'
my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

def get_tweets():
    url = 'https://stream.twitter.com/1.1/statuses/filter.json'
    query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')]
    query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data])
    response = requests.get(query_url, auth=my_auth, stream=True)
    print(query_url, response)
    return response

def send_tweets_to_spark(http_resp, tcp_connection):
    for line in http_resp.iter_lines():
        try:
            full_tweet = json.loads(line)
            tweet_text = full_tweet['text']
            print("Tweet Text: " + tweet_text)
            print ("------------------------------------------")
            tcp_connection.send(tweet_text + '\n')
        except:
            e = sys.exc_info()[0]
            print("Error: %s" % e)
            
TCP_IP = "localhost"
TCP_PORT = 9009
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
resp = get_tweets()
send_tweets_to_spark(resp, conn)
'''

'''Receive side, consumer'''
hashtag_of_interest = '#cloudcomputing'

ssc = StreamingContext(sc, 2)
ssc.checkpoint("TwitterStreamingProcCkpt")
dataStream = ssc.socketTextStream("localhost", 9009)

# split each tweet into words
words = dataStream.flatMap(lambda line: line.split(" "))

# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1)
hashtags = words.filter(lambda w: hashtag_of_interest in w).map(lambda x: (x, 1))

# adding the count of each hashtag to its last count
def aggregate_tags_count(new_values, total_sum):
    return sum(new_values) + (total_sum or 0)

tags_totals = hashtags.updateStateByKey(aggregate_tags_count)

# do processing for each RDD generated in each interval
def get_sql_context_instance(spark_context):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
    return globals()['sqlContextSingletonInstance']

def process_rdd(time, rdd):
    print("----------- %s -----------" % str(time))
    try:
        sql_context = get_sql_context_instance(rdd.context)

        # convert the RDD to Row RDD
        row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))

        # create a DF from the Row RDD
        hashtags_df = sql_context.createDataFrame(row_rdd)

        # Register the dataframe as table
        hashtags_df.registerTempTable("hashtags")

        # get the top 10 hashtags from the table using SQL and print them
        hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10")
        hashtag_counts_df.show()
    except:
        print("Error receiving from the stream!")

tags_totals.foreachRDD(process_rdd)

# start the streaming computation
ssc.start()

# wait for the streaming to finish
try:
    ssc.awaitTermination()
except KeyboardInterrupt:
    ssc.stop()

22/05/01 16:52:18 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9009 - java.net.ConnectException: Connection refused (Connection refused)
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at java.base/java.net.Socket.connect(Socket.java:558)
	at java.base/java.net.Socket.<init>(Socket.java:454)
	at java.base/java.net.Socket.<init>(Socket.java:231)
	at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
	at org.apache.spark.streaming.receiver.ReceiverSupe

----------- 2022-05-01 16:52:20 -----------
Error receiving from the stream!


22/05/01 16:52:20 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9009 - java.net.ConnectException: Connection refused (Connection refused)
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at java.base/java.net.Socket.connect(Socket.java:558)
	at java.base/java.net.Socket.<init>(Socket.java:454)
	at java.base/java.net.Socket.<init>(Socket.java:231)
	at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
	at org.apache.spark.streaming.receiver.ReceiverSupe

----------- 2022-05-01 16:52:22 -----------


                                                                                

Error receiving from the stream!


22/05/01 16:52:22 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9009 - java.net.ConnectException: Connection refused (Connection refused)
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at java.base/java.net.Socket.connect(Socket.java:558)
	at java.base/java.net.Socket.<init>(Socket.java:454)
	at java.base/java.net.Socket.<init>(Socket.java:231)
	at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
	at org.apache.spark.streaming.receiver.ReceiverSupe

In [7]:
spark.stop()
sc.stop()