The Foursquare_streaming_challenge notebook reads from foursquare_logs folder using Spark streaming. As an example, we sum up the number of people being in the trending venues according to the category types of the venues.

In [10]:
# Install Java, Spark, Findspark and PySpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

!pip install -q findspark
!pip install pyspark

# mount Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

tar: spark-2.4.0-bin-hadoop2.7.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now
Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
#import relevant modules
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession

In [0]:
#The code below deletes all the log files inside the foursquare_logs directory¶

import shutil
folder = "/content/gdrive/My Drive/Colab Datasets/foursquare_logs"
for the_file in os.listdir(folder):
    file_path = os.path.join(folder, the_file)
    try:
        if os.path.isfile(file_path):
            os.unlink(file_path)
        #elif os.path.isdir(file_path): shutil.rmtree(file_path)
    except Exception as e:
        print(e)

In [0]:
#We now create a helper function that allow us to store the aggregate number for each category.¶

def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)

In [0]:
# create spark configuration
conf = SparkConf()
conf.setAppName("FoursquareStreamApp")

# create spark context with the above configuration
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("ERROR")

# create the Streaming Context from the above spark context with 
# interval size 10 seconds
ssc = StreamingContext(sc,10)

# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_FoursquareApp")

# read data from drive
dataStream = ssc.textFileStream("/content/gdrive/My Drive/Colab Datasets/foursquare_logs")

In [0]:
#Finally, we implement our primary workflow.
#After the implementation of our workflow, we begin the streaming with ssc.start(). 
#The query stays open until we terminate it (ssc.awaitTermination()).

visitor_counts = dataStream.map(lambda x: (x.split(",")[0], int(x.split(",")[1]))).reduceByKey(lambda a, b: a + b)

runningCounts = visitor_counts.updateStateByKey(updateFunction)

runningCounts.pprint()

# start the streaming computation
ssc.start()

# wait for the streaming to finish
ssc.awaitTermination()

-------------------------------------------
Time: 2019-12-11 22:53:10
-------------------------------------------

-------------------------------------------
Time: 2019-12-11 22:53:20
-------------------------------------------

-------------------------------------------
Time: 2019-12-11 22:53:30
-------------------------------------------

-------------------------------------------
Time: 2019-12-11 22:53:40
-------------------------------------------

-------------------------------------------
Time: 2019-12-11 22:53:50
-------------------------------------------

-------------------------------------------
Time: 2019-12-11 22:54:00
-------------------------------------------

-------------------------------------------
Time: 2019-12-11 22:54:10
-------------------------------------------

-------------------------------------------
Time: 2019-12-11 22:54:20
-------------------------------------------

-------------------------------------------
Time: 2019-12-11 22:54:30
----------

In [0]:
#When the running process halts, you may need to stop the current Spark Context by running the following cell:¶

ssc.stop()