<a href="https://colab.research.google.com/github/MahfuzurRahmanMiah/Specialization/blob/master/Foursquare_streaming_practice.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Running Your Queries In Spark You need to take the data from Foursquare and perform your analysis based on the question you chose.

In our example below, we do the following:

We read the files that our Foursquare client generates from the drive. For each city, we get the trending venue categories and the number of people currently being there. We add up the numbers for the same categories. You can extend this into a web dashboard, or plots inside this notebook if you choose

In [0]:
# Install Java, Spark, Findspark and PySpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"

!pip install -q findspark
!pip install pyspark

# mount Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

tar: spark-2.4.0-bin-hadoop2.7.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 59kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 45.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=0b0156f6ebd30b453aaf7b92177af48619050e7243f81eb176c0650532f36fb3
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Success

In [0]:
#Import the relevant modules
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession

In [0]:
The code below deletes all the log files inside the foursquare_logs directory
import shutil
folder = "/content/gdrive/My Drive/Colab Datasets/foursquare_logs"
for the_file in os.listdir(folder):
    file_path = os.path.join(folder, the_file)
    try:
        if os.path.isfile(file_path):
            os.unlink(file_path)
        #elif os.path.isdir(file_path): shutil.rmtree(file_path)
    except Exception as e:
        print(e)


In [0]:
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)

In [0]:
# create spark configuration
conf = SparkConf()
conf.setAppName("FoursquareStreamApp")

# create spark context with the above configuration
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("ERROR")

# create the Streaming Context from the above spark context with 
# interval size 10 seconds
ssc = StreamingContext(sc,10)

# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_FoursquareApp")

# read data from drive
dataStream = ssc.textFileStream("/content/gdrive/My Drive/Colab Datasets/foursquare_logs")

In [0]:
#Finally, we implement our primary workflow.
#After the implementation of our workflow, we begin the streaming with ssc.start(). 
# The query stays open until we terminate it (ssc.awaitTermination()).
visitor_counts = dataStream.map(lambda x: (x.split(",")[0], int(x.split(",")[1]))).reduceByKey(lambda a, b: a + b)

runningCounts = visitor_counts.updateStateByKey(updateFunction)

runningCounts.pprint()

# start the streaming computation
ssc.start()

# wait for the streaming to finish
ssc.awaitTermination()

In [0]:
# When the running process halts, you may need to stop the current 
# Spark Context by running the following cell:
ssc.stop()