In [None]:
########################
#### Trend equality ####
########################

# Imports
import pymongo, urllib, json, datetime, psycopg2
from pymongo import MongoClient
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from functools import reduce
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Connect to MongoDB
pw = ""
cluster = MongoClient('mongodb+srv:')
db = cluster["trends"]
collection_berlin = db["berlin"]
collection_stuttgart = db["stuttgart"]

# Get trends from MongoDB
trends_berlin = collection_berlin.find()
trends_stuttgart = collection_stuttgart.find()

# Close MongoDB connection
cluster.close()

# Connect to Postgres
conn = psycopg2.connect(host="", port = 5432, database="trends", user="", password="")

# Get or create Spark Context and Spark Session
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

# Create empty list for dataframes
df_list_trends = list()

# Iterate over list of trends
for trends_b, trends_s in zip(trends_berlin, trends_stuttgart):
    
    # Create PySpark dataframe
    df_trends_b = spark.createDataFrame([], StringType())
    df_trends_s = spark.createDataFrame([], StringType())
    
    # Iterate over trends at listelement
    counter = 0
    while counter <= 49:
        
        # Create dataframes with trends
        newRow_b = spark.createDataFrame([trends_b["trends"][counter]["name"].replace("#","")], StringType())
        newRow_s = spark.createDataFrame([trends_s["trends"][counter]["name"].replace("#","")], StringType())
        df_trends_b = df_trends_b.union(newRow_b)
        df_trends_s = df_trends_s.union(newRow_s)
        counter+=1
    
    # Append to list of dataframes
    df_list_trends.append(df_trends_b)
    df_list_trends.append(df_trends_s)
    
    # Join dataframes of trends by equal values
    df_trend_equality = df_list_trends[0].join(df_list_trends[1], [df_list_trends[0].value == df_list_trends[1].value] , how = 'inner' )
    
    # Calculate trend equality by counting equal values
    trend_equality = df_trend_equality.count()/50
    trend_equality = round((trend_equality * 100), 0)
    
    # Get datetime of trends
    date = trends_b["as_of"].replace("T"," ").replace("Z","")
    date = datetime.datetime.strptime(date, '%Y-%m-%d %H:%M:%S')
    date = date + datetime.timedelta(hours=2)
    
    # Insert data into PostgreSQL
    cur = conn.cursor()
    cur.execute("""INSERT INTO Trend_equality (trend_equality, date) VALUES ("""+str(trend_equality)+""",'"""+str(date)+"""')""")
    conn.commit()
    cur.close()
    
    # Clear list of dataframes
    df_list_trends = list()
    
    # Print results
    print("-----------------------------------------------------------------------------")
    print("Trend equality: "+str(trend_equality)+"")
    print("Datetime: "+str(date)+"")
    print("""INSERT INTO Trend_equality (trend_equality, date) VALUES ("""+str(trend_equality)+""",'"""+str(date)+"""')""")
    print("-----------------------------------------------------------------------------")

# Close connection to DB and Spark
conn.close()
spark.stop()