<a href="https://colab.research.google.com/github/agrr12/TCC/blob/master/TCC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!pip install pyspark
#import pyspark
#from pyspark.sql import SparkSession
#from pyspark import SparkFiles
#from pyspark.sql.functions import *
from google.colab import drive
#from pyspark.sql.types import *
import numpy as np
from scipy import sparse
from datetime import *
import matplotlib.pyplot as plt
import cv2
import os
import pandas as pd
from skimage.metrics import structural_similarity as ssim, peak_signal_noise_ratio as psnr
import warnings
import seaborn as sns
from PIL import Image
from collections import Counter

# Definition of Auxiliary Functions

## 1. create_sparse_matrix
- **Purpose**: Creates a sparse matrix based on the difference between two date strings.
- **Description**: Returns a matrix with rows as days and columns corresponding to the desired unit (seconds, minutes, or hours).

In [None]:
def create_sparse_matrix(start_date_str: str, end_date_str: str, unit: str = 'seconds') -> sparse.dok_matrix:
    """
    Create a sparse matrix based on the difference between two date strings.

    Parameters:
    - start_date_str (str): Start date in the format '%Y-%m-%d %H:%M:%S'.
    - end_date_str (str): End date in the format '%Y-%m-%d %H:%M:%S'.
    - unit (str): Desired unit for the matrix columns ('seconds', 'minutes', or 'hours').

    Returns:
    - sparse.dok_matrix: A matrix with shape (days, units_per_day).
    """
    # Define the date format
    fmt = '%Y-%m-%d %H:%M:%S'

    # Parse the start and end dates
    d1 = datetime.strptime(start_date_str, fmt)
    d2 = datetime.strptime(end_date_str, fmt)

    # Calculate the difference in days
    delta = d2 - d1
    days = delta.days + 1  # +1 to include the last day

    # Define units per day
    if unit == 'seconds':
        units_per_day = 24 * 60 * 60
    elif unit == 'minutes':
        units_per_day = 24 * 60
    elif unit == 'hours':
        units_per_day = 24

    # Create a sparse matrix with zeros
    sparse_matrix = sparse.dok_matrix((days, units_per_day), dtype=np.int8)

    return sparse_matrix

## 2. generate_image
- **Purpose**: Visualizes date-time occurrences from a sparse matrix.
- **Description**: Processes date-time objects, updates the sparse matrix based on occurrences, and then creates an image (either hexbin or scatter) to represent the data visually. The resultant image is saved in a specified directory.

In [None]:
def generate_image(dates, unit= 'seconds', sparse_matrix= None, image_type='hexbin', start_date_str = "", grid=10):
    folder = f'days_{unit}'
    # Loop through each date and update the matrix based on the desired unit
    for d in dates:
        day = (d - datetime.strptime(start_date_str, '%Y-%m-%d %H:%M:%S')).days
        if unit == 'seconds':
            index = d.hour * 3600 + d.minute * 60 + d.second
        elif unit == 'minutes':
            index = d.hour * 60 + d.minute
        elif unit == 'hours':
            index = d.hour
        sparse_matrix[day-1, index-1] += 1

    if image_type == 'hexbin':
      dense_matrix = sparse_matrix.toarray()
      # Get the x and y coordinates
      a, b = np.indices(dense_matrix.shape)
      matrix_flat = dense_matrix.flatten()
      a_flat = a.flatten()
      b_flat = b.flatten()
      plt.hexbin(b_flat, a_flat, gridsize=grid, cmap='inferno', C=matrix_flat)
    elif image_type == 'scatter':
      if unit == 'seconds':
          plt.xlim(-10000,100000)
          plt.ylim(-10, 100)
      elif unit == 'minutes':
          plt.xlim(-100,1500)
          plt.ylim(-10, 100)
      elif unit == 'hours':
          plt.xlim(-5,26)
          plt.ylim(-10, 100)
      # Get the non-zero entries of the sparse matrix
      nonzero_rows, nonzero_cols = sparse_matrix.nonzero()
      plt.scatter(nonzero_cols, nonzero_rows, marker='s', s=80, alpha=0.2, color='red')
    plt.axis('off')
    plt.savefig(f'/content/drive/My Drive/TCC/images_dir/{folder}/{image_type}/{x}.png')
    plt.clf()

## 3. timestamp_frequency
- **Purpose**: Calculates the frequency of timestamps in a specified range.
- **Description**: Computes the frequency of timestamps within the period "2022-09-01 00:00:01" to "2022-11-30 23:59:59". The result is a list where each index indicates the frequency at that timestamp, adjusted based on the given granularity (seconds, minutes, or hours).


In [None]:
from datetime import datetime, timedelta
from collections import Counter

def timestamp_frequency(timestamps, granularity='seconds'):
    # Define the start and end timestamps based on the provided format
    start_timestamp = datetime.strptime("2022-09-01 00:00:01", "%Y-%m-%d %H:%M:%S")
    end_timestamp = datetime.strptime("2022-11-30 23:59:59", "%Y-%m-%d %H:%M:%S")

    # Adjustments based on granularity
    if granularity == 'minutes':
        start_timestamp = start_timestamp.replace(second=0)
        end_timestamp = end_timestamp.replace(second=59)
        timestamps = [ts if isinstance(ts, datetime) else datetime.strptime(ts, "%Y-%m-%d %H:%M:%S").replace(second=0) for ts in timestamps]
        delta = timedelta(minutes=1)
    elif granularity == 'hours':
        start_timestamp = start_timestamp.replace(minute=0, second=0)
        end_timestamp = end_timestamp.replace(minute=59, second=59)
        timestamps = [ts if isinstance(ts, datetime) else datetime.strptime(ts, "%Y-%m-%d %H:%M:%S").replace(minute=0, second=0) for ts in timestamps]
        delta = timedelta(hours=1)
    else:  # default is seconds
        delta = timedelta(seconds=1)
        timestamps = [ts if isinstance(ts, datetime) else datetime.strptime(ts, "%Y-%m-%d %H:%M:%S") for ts in timestamps]

    # Calculate the total intervals based on granularity in the date range
    total_intervals = int((end_timestamp - start_timestamp) / delta) + 1

    # Initialize the result list with zeros for the entire range
    result = [0] * total_intervals

    # Use Counter to count occurrences of each timestamp
    timestamp_counts = Counter(timestamps)

    # For each unique timestamp, update the corresponding index in the result list
    for timestamp, count in timestamp_counts.items():
        idx = int((timestamp - start_timestamp) / delta)
        result[idx] = count

    return result


## 4. run_length_encode_zeros
- **Purpose**: Encodes sequences of zeros from a timestamp frequency list.
- **Description**: After calculating the timestamp frequency with the given granularity, the function performs run-length encoding focusing on zero sequences. Positive values in the output represent original frequency counts, while negative values represent consecutive runs of zeros.


In [None]:
def run_length_encode_zeros(array, granularity):
    arr = timestamp_frequency(array, granularity)
    encoded = []
    zero_count = 0

    for val in arr:
        if val == 0:
            zero_count += 1
        else:
            if zero_count:
                encoded.append(-zero_count)
                zero_count = 0
            encoded.append(val)

    # Handle case where the array ends with zeros
    if zero_count:
        encoded.append(-zero_count)

    return encoded

## 5. ConstrainedAWARP
- **Purpose**: Computes the constrained adaptive time warping distance between two sequences.
- **Description**: This function determines the optimal alignment between two time series while considering time shifts and distortions. It uses a dynamic programming approach, constructing a matrix where each cell represents the warping cost up to those elements of both sequences. The overall warping distance is the value in the matrix's bottom-right corner, and this matrix is also returned for detailed insights.

In [None]:
def ConstrainedAWARP(x, y, w):
    # Initialize the lengths of both sequences and append the value '1' to each.
    n = len(x)
    m = len(y)
    x.append(1)
    y.append(1)

    # Create a 2D array filled with infinity for dynamic programming.
    D = np.inf * np.ones((n+1, m+1))
    D[0][0] = 0  # Base case

    # Calculate the timestamps of the events in x.
    # If value is positive, it's an event; if negative, it's a gap.
    tx = [0] * (n+1)
    iit = 0
    for i in range(n+1):
        if x[i] > 0:
            iit += 1
            tx[i] = iit
        else:
            iit += abs(x[i])
            tx[i] = iit

    # Similarly, calculate the timestamps of the events in y.
    ty = [0] * (m+1)
    iit = 0
    for i in range(m+1):
        if y[i] > 0:
            iit += 1
            ty[i] = iit
        else:
            iit += abs(y[i])
            ty[i] = iit

    # Iterate through both sequences to compute the warping cost.
    for i in range(n):
        for j in range(m):
            gap = abs(tx[i] - ty[j])

            # If the gap between events is larger than w, set to infinity.
            if gap > w and ((j > 0 and ty[j-1] - tx[i] > w) or (i > 0 and tx[i-1] - ty[j] > w)):
                D[i+1][j+1] = np.inf
            else:
                # Initialize three possible costs (from diagonal, left, and top).
                a1, a2, a3 = np.inf, np.inf, np.inf

                # Cost calculation for matching x[i] and y[j].
                if x[i] > 0 and y[j] > 0 and gap <= w:
                    a1 = D[i][j] + (x[i] - y[j]) ** 2
                elif x[i] < 0 and y[j] < 0:
                    a1 = D[i][j]
                elif x[i] > 0 and y[j] < 0:
                    a1 = D[i][j] + x[i] ** 2 * (-y[j])
                elif x[i] < 0 and y[j] > 0:
                    a1 = D[i][j] + y[j] ** 2 * (-x[i])

                # Cost calculation for matching x[i] and a gap in y.
                if x[i] > 0 and y[j] > 0 and gap <= w:
                    a2 = D[i+1][j] + (x[i] - y[j]) ** 2
                elif x[i] < 0 and y[j] < 0:
                    a2 = D[i+1][j]
                elif x[i] < 0 and y[j] > 0:
                    a2 = D[i+1][j] + y[j] ** 2
                elif x[i] > 0 and y[j] < 0 and gap <= w:
                    a2 = D[i+1][j] + x[i] ** 2 * (-y[j])

                # Cost calculation for matching y[j] and a gap in x.
                if x[i] > 0 and y[j] > 0 and gap <= w:
                    a3 = D[i][j+1] + (x[i] - y[j]) ** 2
                elif x[i] < 0 and y[j] < 0:
                    a3 = D[i][j+1]
                elif x[i] > 0 and y[j] < 0:
                    a3 = D[i][j+1] + x[i] ** 2
                elif x[i] < 0 and y[j] > 0 and gap <= w:
                    a3 = D[i][j+1] + y[j] ** 2 * (-x[i])

                # Store the minimum of the three computed costs.
                D[i+1][j+1] = min([a1, a2, a3])

    # Return the square root of the final cell value as the distance and the matrix D.
    d = np.sqrt(D[n][m])
    return d, D


#ETL of the Databases

In [None]:
spark = SparkSession.builder.master("local[*]").appName('TCC').getOrCreate()
sc = spark.sparkContext
file_name = 'basics.csv'
drive_path= 'My Drive/TCC'
drive.mount('/content/drive')
df = spark.read.csv(f'/content/drive/{drive_path}/{file_name}', inferSchema=True, header=True)

In [None]:
#Deduplicate
df_basic = df.dropDuplicates()
#Consider only the experiment date range
df_basic = df_basic.filter("publishedAt <= '2022-11-30 23:59:59'")
#Get number of comments per user
df_comments_per_user = df_basic.groupby("authorChannelId").count()
#Filter out all those with 500 or less comments
df_high_commenters = df_comments_per_user.filter("count>500").withColumnRenamed("authorChannelId", 'authorChannelId_HC')
#Build a dataframe with only the comments of those with more than 500 comments
high_commenters_dates = df_high_commenters[['authorChannelId_HC']].join(df_basic[['authorChannelId', 'publishedAt']], df_high_commenters.authorChannelId_HC == df_basic.authorChannelId, 'left').drop('authorChannelId_HC')
#Build a dataframe with one row per user, where the row contains the user ID and a list of all the dates of the comments published by the user
dates_as_arrays = high_commenters_dates.groupby("authorChannelId").agg(collect_list(col("publishedAt").cast("string")))
dates_as_arrays.toPandas().to_csv('/content/drive/My Drive/TCC/date_array.csv', index=False)

#Creation of Dataset with encoded time series

In [None]:
# UDF without partially applied arguments
udf_run_length_encode_zeros_seconds = udf(lambda x: run_length_encode_zeros(x, 'seconds'), ArrayType(IntegerType()))
udf_run_length_encode_zeros_minutes = udf(lambda x: run_length_encode_zeros(x, 'minutes'), ArrayType(IntegerType()))
udf_run_length_encode_zeros_hours = udf(lambda x: run_length_encode_zeros(x, 'hours'), ArrayType(IntegerType()))

# Apply UDF to DataFrame
df_sec = dates_as_arrays.withColumn("encode_sec", udf_run_length_encode_zeros_seconds(dates_as_arrays["collect_list(publishedAt)"])).drop("collect_list(publishedAt)")
df_min = dates_as_arrays.withColumn("encode_min", udf_run_length_encode_zeros_minutes(dates_as_arrays["collect_list(publishedAt)"])).drop("collect_list(publishedAt)")
df_hou = dates_as_arrays.withColumn("encode_hou", udf_run_length_encode_zeros_hours(dates_as_arrays["collect_list(publishedAt)"])).drop("collect_list(publishedAt)")

df_sec.toPandas().to_csv('/content/drive/My Drive/TCC/encoded_seconds.csv', index=False)
df_min.toPandas().to_csv('/content/drive/My Drive/TCC/encoded_minutes.csv', index=False)
df_hou.toPandas().to_csv('/content/drive/My Drive/TCC/encoded_hours.csv', index=False)

In [None]:
p = df_sec.withColumn.toPandas()

p.to_csv('/content/drive/My Drive/TCC/t.csv', index=False)

#Conversion of Time Series into Images

In [None]:
# Define the schema for the DataFrame


image_type = 'hexbin'
unit = 'seconds'
folder = f'days_{unit}'

data = []

for x,y in dates_as_arrays.toPandas().iterrows():
  #criar uma matriz esparsa com zeros
  sparse_matrix = create_sparse_matrix('2022-09-01 00:00:01', '2022-11-30 23:59:59', unit)
  author = y['authorChannelId']
  dates = y['collect_list(publishedAt)']
  data.append((author, x))
  generate_image(dates, unit, sparse_matrix, image_type, '2022-09-01 00:00:01', grid=100)
  print(x)


df_pandas = pd.DataFrame(data, columns=['authorChannelId', 'imgNumber'])
# Write the Pandas DataFrame to a CSV file
df_pandas.to_csv(f"/content/drive/My Drive/TCC/images_dir/days_seconds/{image_type}_image_map.csv", index=False, sep=";")

In [None]:
from google.colab import drive
import pandas as pd
import numpy as np
drive.mount('/content/drive')
df = pd.read_csv(f'/content/drive/My Drive/TCC/encoded_hours.csv')


for x1,y1 in df[100::].iterrows():
  id1 = y1['authorChannelId']
  series1 = [int(i) for i in y1['encode_hou'].strip('[]').split(',')]
  for x2,y2 in df[int(x1)+1::].iterrows():
    id2 = y2['authorChannelId']
    series2 = [int(i) for i in y2['encode_hou'].strip('[]').split(',')]
    print(id1, id2, ConstrainedAWARP(series1, series2, np.inf)[0])

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
UCL7lmLAr69g7F5nhNnH3otw UCLAH57g3zXP4L0VWoMIzcow 4.0
UCL7lmLAr69g7F5nhNnH3otw UCLGl3f23i0Uq1BhuJP6Qc6A 9.38083151964686
UCL7lmLAr69g7F5nhNnH3otw UCLJbCSgRstqI_xgTj-EzFxQ 2.0
UCL7lmLAr69g7F5nhNnH3otw UCLbdxwzNrvJt-8ZznBdnqmg 3.3166247903554
UCL7lmLAr69g7F5nhNnH3otw UCLfnU4AebeC6T1yyTft-3-Q 10.099504938362077
UCL7lmLAr69g7F5nhNnH3otw UCLm87UV5VLe__Zd-WufggoA 11.40175425099138
UCL7lmLAr69g7F5nhNnH3otw UCM4mGGDJorCISmfuiFHeq5A 11.090536506409418
UCL7lmLAr69g7F5nhNnH3otw UCM7hvEMA60pkjgljyUnFGMQ 13.076696830622021
UCL7lmLAr69g7F5nhNnH3otw UCMBIeaJ-gZJU2cm6ZyD6mpg 5.291502622129181
UCL7lmLAr69g7F5nhNnH3otw UCMEwrZf1tu1mMX7cPKazVmQ 8.48528137423857
UCL7lmLAr69g7F5nhNnH3otw UCMKRtLxnlenU4iRBTiqBCUQ 9.486832980505138
UCL7lmLAr69g7F5nhNnH3otw UCM_BaqWFIC7mtoboBpcdBeA 6.082762530298219
UCL7lmLAr69g7F5nhNnH3otw UCMj6d0hevGiK0i7rwu1eCJA 8.774964387392123
UCL7lmLAr69g7F5n

KeyboardInterrupt: ignored