In [None]:
import datetime
import json
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

KAFKA_TOPIC = "social_media"
BOOTSTRAP_SERVER = "localhost:9092"

stream_period_seconds = 5
ssc = StreamingContext(sc, stream_period_seconds)
ssc.checkpoint("./checkpoint")
lines = KafkaUtils.createDirectStream(ssc, [KAFKA_TOPIC],
                                      {"metadata.broker.list": BOOTSTRAP_SERVER})

TIMEZONE = datetime.timezone(datetime.timedelta(hours=7)) # WIB

def parse_data_flat_map(key_value):
    json_string = key_value[1]
    try:
        payload = json.loads(json_string)
        
        social_media = payload.get("crawler_target", {}).get("specific_resource_type", "")
        if social_media != "":
            return [(social_media, payload)]
        else:
            print("---")
            print("Warning: unidentified social media ignored")
            print(json_string)
            print("---")
            return []
    
    except json.JSONDecodeError:
        print("---")
        print("Warning: broken JSON ignored")
        print(json_string)
        print("---")
        return []
    
def facebook_extract_map(payload):
    if "created_time" not in payload:
        return None
    elif "from" not in payload:
        return None
    elif "id" not in payload["from"]:
        return None
    else:
        timestamp_pattern = "%Y-%m-%dT%H:%M:%S%z" # e.g.: 2023-04-04T17:09:23+0000
        return {
            "timestamp": datetime.datetime.strptime(payload["created_time"], timestamp_pattern).astimezone(TIMEZONE).replace(tzinfo=None),
            "user_id": payload["from"]["id"]
        }

def instagram_extract_map(payload):
    if "created_time" not in payload:
        return None
    elif "user" not in payload:
        return None
    elif "id" not in payload["user"]:
        return None
    else:
        return {
            "timestamp": datetime.datetime.fromtimestamp(int(payload["created_time"])).astimezone(TIMEZONE).replace(tzinfo=None),
            "user_id": payload["user"]["id"]
        }

def twitter_extract_map(payload):
    if "created_at" not in payload:
        return None
    elif "user_id" not in payload:
        return None
    else:
        timestamp_pattern = "%a %b %d %H:%M:%S %z %Y" # e.g.: Sat Nov 10 16:09:22 +0000 2012
        return {
            "timestamp": datetime.datetime.strptime(payload["created_at"], timestamp_pattern).astimezone(TIMEZONE).replace(tzinfo=None),
            "user_id": payload["user_id"]
        }
    
def youtube_extract_map(payload):
    if "snippet" not in payload:
        return None
    elif "publishedAt" not in payload["snippet"]:
        return None
    elif "channelId" not in payload["snippet"]:
        return None
    else:
        
        timestamp_pattern = "%Y-%m-%dT%H:%M:%SZ" # e.g.: 2023-04-04T17:09:24Z
        return {
            "timestamp": datetime.datetime.strptime(payload["snippet"]["publishedAt"], timestamp_pattern).astimezone(TIMEZONE).replace(tzinfo=None),
            "user_id": payload["snippet"]["channelId"]
        }
    
def print_malformed_data_warning(social_media, payload):
    print("---")
    print(f"Warning: ignoring malformed data from {social_media}")
    print(payload)
    print("---")
    
def extract_flat_map(key_value):
    social_media, payload = key_value
    if social_media == "facebook":
        extract_result = facebook_extract_map(payload)
        if extract_result is None:
            print_malformed_data_warning(social_media, payload)
            return []
            
        return [(social_media, extract_result)]
    
    elif social_media == "instagram":
        extract_result = instagram_extract_map(payload)
        if extract_result is None:
            print_malformed_data_warning(social_media, payload)
            return []
            
        return [(social_media, extract_result)]
    
    elif social_media == "twitter":
        extract_result = twitter_extract_map(payload)
        if extract_result is None:
            print_malformed_data_warning(social_media, payload)
            return []
            
        return [(social_media, extract_result)]
    
    elif social_media == "youtube":
        extract_result = youtube_extract_map(payload)
        if extract_result is None:
            print_malformed_data_warning(social_media, payload)
            return []
            
        return [(social_media, extract_result)]
    
    else:
        print("---")
        print(f"Warning: unhandled social media: {social_media}")
        print(payload)
        print("---")
        return []
    
def binned_timestamp_map(key_value):
    social_media, payload = key_value
    minute = payload["timestamp"].minute
    binned_minute = (minute // 5) * 5
    new_timestamp = payload["timestamp"].replace(minute=binned_minute, second=0)
    return (f"{social_media};{new_timestamp.isoformat()}", payload)

def update_function(new_payload_list, old_payload):
    if len(new_payload_list) == 0:
        return old_payload
    
    if old_payload is None:
        payload = {
            "count": 0,
            "unique_count": 0,
            "user_ids": set(),
            "created_at": datetime.datetime.now().astimezone(TIMEZONE).replace(tzinfo=None),
            "updated_at": datetime.datetime.now().astimezone(TIMEZONE).replace(tzinfo=None)
        }
        
    else:
        payload = old_payload

    for new_payload in new_payload_list:
        payload["count"] += 1
        if new_payload["user_id"] not in payload["user_ids"]:
            payload["unique_count"] += 1
            payload["user_ids"].add(new_payload["user_id"])
        
    payload["updated_at"] = datetime.datetime.now().astimezone(TIMEZONE).replace(tzinfo=None)
    return payload

def calculate_aggregate(lines, window_length = 2, sliding_interval = 2):
    payloads = lines.flatMap(parse_data_flat_map)
    specific_payloads = payloads.flatMap(extract_flat_map)
    binned_timestamp_payloads = specific_payloads.map(binned_timestamp_map)
    aggregated_payloads = binned_timestamp_payloads.updateStateByKey(update_function)
    return aggregated_payloads

# run the function
result = calculate_aggregate(lines, window_length=2, sliding_interval=2)
# Print
result.pprint()
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2023-04-11 13:32:15
-------------------------------------------

-------------------------------------------
Time: 2023-04-11 13:32:20
-------------------------------------------
('instagram;2023-04-05T00:05:00', {'count': 2, 'unique_count': 2, 'user_ids': {'MXSZEKQMAAQULOEAHCPN', 'NAWWQNDUURKZDMYBMCOI'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 20, 562644), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 20, 562724)})
('twitter;2023-04-04T23:35:00', {'count': 1, 'unique_count': 1, 'user_ids': {'GGYFPIBGLMUIXSEOYHRP'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 20, 562804), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 20, 562818)})
('facebook;2023-04-05T00:05:00', {'count': 2, 'unique_count': 2, 'user_ids': {'QJELCOCJYXPVVNMKZBOE', 'NQKUNTHKIDVRNAPVSNOX'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 21, 21522), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 21, 21577)})
('youtube;2023-04

-------------------------------------------
Time: 2023-04-11 13:32:50
-------------------------------------------
('instagram;2023-04-05T00:05:00', {'count': 4, 'unique_count': 2, 'user_ids': {'MXSZEKQMAAQULOEAHCPN', 'NAWWQNDUURKZDMYBMCOI'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 20, 562644), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 45, 183835)})
('twitter;2023-04-04T23:35:00', {'count': 2, 'unique_count': 1, 'user_ids': {'GGYFPIBGLMUIXSEOYHRP'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 20, 562804), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 45, 183903)})
('facebook;2023-04-05T00:05:00', {'count': 4, 'unique_count': 2, 'user_ids': {'QJELCOCJYXPVVNMKZBOE', 'NQKUNTHKIDVRNAPVSNOX'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 21, 21522), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 45, 307703)})
('youtube;2023-04-04T17:05:00', {'count': 2, 'unique_count': 1, 'user_ids': {'GXZXOAYWEFCORHECIZUT'}, 'created_at': datetime.dateti

-------------------------------------------
Time: 2023-04-11 13:33:20
-------------------------------------------
('instagram;2023-04-05T00:05:00', {'count': 4, 'unique_count': 2, 'user_ids': {'MXSZEKQMAAQULOEAHCPN', 'NAWWQNDUURKZDMYBMCOI'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 20, 562644), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 45, 183835)})
('twitter;2023-04-04T23:35:00', {'count': 2, 'unique_count': 1, 'user_ids': {'GGYFPIBGLMUIXSEOYHRP'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 20, 562804), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 45, 183903)})
('facebook;2023-04-05T00:05:00', {'count': 4, 'unique_count': 2, 'user_ids': {'QJELCOCJYXPVVNMKZBOE', 'NQKUNTHKIDVRNAPVSNOX'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 21, 21522), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 45, 307703)})
('youtube;2023-04-04T17:05:00', {'count': 2, 'unique_count': 1, 'user_ids': {'GXZXOAYWEFCORHECIZUT'}, 'created_at': datetime.dateti

-------------------------------------------
Time: 2023-04-11 13:33:50
-------------------------------------------
('instagram;2023-04-05T00:05:00', {'count': 4, 'unique_count': 2, 'user_ids': {'MXSZEKQMAAQULOEAHCPN', 'NAWWQNDUURKZDMYBMCOI'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 20, 562644), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 45, 183835)})
('twitter;2023-04-04T23:35:00', {'count': 2, 'unique_count': 1, 'user_ids': {'GGYFPIBGLMUIXSEOYHRP'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 20, 562804), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 45, 183903)})
('facebook;2023-04-05T00:05:00', {'count': 4, 'unique_count': 2, 'user_ids': {'QJELCOCJYXPVVNMKZBOE', 'NQKUNTHKIDVRNAPVSNOX'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 21, 21522), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 45, 307703)})
('youtube;2023-04-04T17:05:00', {'count': 2, 'unique_count': 1, 'user_ids': {'GXZXOAYWEFCORHECIZUT'}, 'created_at': datetime.dateti

-------------------------------------------
Time: 2023-04-11 13:34:20
-------------------------------------------
('instagram;2023-04-05T00:05:00', {'count': 4, 'unique_count': 2, 'user_ids': {'MXSZEKQMAAQULOEAHCPN', 'NAWWQNDUURKZDMYBMCOI'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 20, 562644), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 45, 183835)})
('twitter;2023-04-04T23:35:00', {'count': 2, 'unique_count': 1, 'user_ids': {'GGYFPIBGLMUIXSEOYHRP'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 20, 562804), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 45, 183903)})
('facebook;2023-04-05T00:05:00', {'count': 4, 'unique_count': 2, 'user_ids': {'QJELCOCJYXPVVNMKZBOE', 'NQKUNTHKIDVRNAPVSNOX'}, 'created_at': datetime.datetime(2023, 4, 11, 13, 32, 21, 21522), 'updated_at': datetime.datetime(2023, 4, 11, 13, 32, 45, 307703)})
('youtube;2023-04-04T17:05:00', {'count': 2, 'unique_count': 1, 'user_ids': {'GXZXOAYWEFCORHECIZUT'}, 'created_at': datetime.dateti

# HandsOn Week 9
Welcome to HandsOn Week 9. In this HandsOn, you will try to play with spark streaming where the data is from a Kafka producer.

## Setting up
Since there SparkStreaming from Kafka is not supported in Spark version 3.1.1, There are some things that you need to setup or install:
1. You need to download apache spark version ~~2.7.4~~ __2.4.7__ with hadoop 2.7 [https://spark.apache.org/downloads.html](https://spark.apache.org/downloads.html)
2. Unzip the tgz file
3. Open bashrc file `nano ~/.bashrc`. Then, find those variables and set the values to `SPARK_HOME=~/Downloads/spark-2.4.7-bin-hadoop2.7` and `PYSPARK_PYTHON=python3.7`
4. Activate `source ~/.bashrc`



## Milestone 1
You need to install kafka-python by ```pip install kafka-python```.<br><br>
In this milestone, you only need to run ```producer_variance.py``` and ```consumer_variance.py``` (these two code files are already provided inside the zip file).

Screenshot your ```consumer_variance.py``` output, and put in this cell below. 

<img src="./consumer_variance_output.png" alt="Text &#8220;message: 1 2 3 4 5 6&#8221; printed many times."/>

## Milestone 2
After making sure that the message is published by ```producer_variance.py``` and successfully consumed by ```consumer_variance.py``` in the topic of ```variance``` in the Milestone 1 above, then, you are ready for Milestone 2.<br>

In Milestone 2, you need to implement ```calculate_variance``` function with the formula --> $variance = \frac{\sum_{i=1}^{N}x_i^2}{N}-(\frac{\sum_{i=1}^{N}x_i}{N})^2$. This function will be used to calculate variance for each window operation to the streaming data, and the variance is "**accumulative/global**" value up to current stream data. For example, in the first window, we have data ~~```1,2,3```~~ __`1 2 3`__, then the variance is the variance of ~~```1,2,3```~~ __```1 2 3```__. Let's say we have streaming data of ~~```4,5,6```~~ __```4 5 6```__  in the second window, thus the variance in this second window is the variance of ~~```1,2,3,4,5,6```~~ __```1 2 3 4 5 6```__, and so on for the following windows.<br>

The ```calculate_variance``` function will return a DStream (RDD) with a format of ```('sum_x_square:', sum_x_square_value, 'sum_x:', sum_x_value, 'n:', n_value, 'var:', variance_value)``` where ```sum_x_square_value```$=\sum_{i=1}^{N}x_i^2$, ```sum_x_value```$=\sum_{i=1}^{N}x_i$ and ```n```$=N$. Note that $x_i=$ i-th of individual stream data, and $N=$ the number of individual stream data -count- up to i-th data.

**Important:** In order to stream from Kafka producer to Spark Streaming, you need to download [spark-streaming-kafka-0-8-assembly_2.11-2.4.7.jar](https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8-assembly_2.11/2.4.7) from maven repository (adjust with your Spark version), and put this jar file to ```your_spark_folder/jars```. For VM provided by the class, ```spark_folder``` is in ```/home/bigdata/spark-2.4.5-bin-hadoop2.7```.

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

KAFKA_TOPIC = "variance"
BOOTSTRAP_SERVER = "localhost:9092"

ssc = StreamingContext(sc,1) #stream each one second
ssc.checkpoint("./checkpoint")
lines = KafkaUtils.createDirectStream(ssc, [KAFKA_TOPIC],
                                      {"metadata.broker.list": BOOTSTRAP_SERVER})

def calculate_variance(lines, window_length = 2, sliding_interval = 2):
    """
    Function to calculate "accumulated/global variance" in each window operation
    Params:
        lines: Spark DStream defined above (in this jupyter cell)
        window_length: length of window in windowing operation
        sliding_interval: sliding interval for the window operation
    Return:
        result: DStream (RDD) of variance result with 
                format --> ('sum_x_square:', sum_x_square_value, 'sum_x:', sum_x_value, 'n:', n_value, 'var:', variance_value)
                Example:   ('sum_x_square:', 182.0, 'sum_x:', 42.0, 'n:', 12.0, 'var:', 2.916666666666666)
    """
    # Realize this function here. Note that you are not allowed to modify any code other than this function.
    DELIMETER = " "
    integer_in_strings = lines.flatMap(lambda line: line[1].split(DELIMETER))
    integers = integer_in_strings.map(int)
    metric_key_pairs = integers.map(lambda x: (None, {
        "sum_x_square": x**2,
        "sum_x": x,
        "n": 1
    }))
    
    reduce_function = lambda x1, x2: {
        "sum_x_square": x1["sum_x_square"] + x2["sum_x_square"],
        "sum_x": x1["sum_x"] + x2["sum_x"],
        "n": x1["n"] + x2["n"]
    }
    reduced_metric_key_pairs = metric_key_pairs.reduceByKey(lambda x1, x2: {
        "sum_x_square": x1["sum_x_square"] + x2["sum_x_square"],
        "sum_x": x1["sum_x"] + x2["sum_x"],
        "n": x1["n"] + x2["n"]
    })
    
    def update_function(new_values, running_count):
        if running_count is None:
            running_count = {"sum_x_square": 0, "sum_x": 0, "n": 0}
            
        accumulate_value = {}
        for key in ["sum_x_square", "sum_x", "n"]:
            accumulate_value[key] = sum(map(lambda x: x[key], new_values), running_count[key])
            
        return accumulate_value
    
    accumulated_metric_key_pairs = reduced_metric_key_pairs.updateStateByKey(update_function)
    
    accumulated_metrics = accumulated_metric_key_pairs.map(lambda x: x[1])
    result = accumulated_metrics.map(lambda x: {
        "sum_x_square": x["sum_x_square"],
        "sum_x": x["sum_x"],
        "n": x["n"],
        "var": (x["sum_x_square"] / x["n"]) - (x["sum_x"] / x["n"]) ** 2
    })
    
    return result

# run the function
result = calculate_variance(lines, window_length=2, sliding_interval=2)
# Print
result.pprint()
ssc.start()
ssc.awaitTermination()

## Submission
Archive this ipynb file and the screenshot image needed in the Milestone 1 into zip file with a format of: ~~```HandsOnWeek11_NIM_FullName.zip```~~ __```HandsOnWeek9_NIM_FullName.zip```__, and submit to the submission form.

**Note**: make sure in the Milestone 2, the cell has its output, but not too many streams (you can save this ipynb file approximatelly in the range of 4-20 window operations)

Enjoy...

## Future Developer Note
After downloading Spark 2.4.7, be sure to unzip your file properly. Otherwise, the result has some missing files that can make you wasting more than three hours before you realize it.