# Clone git

In [None]:
!git clone https://github.com/One-Night-Miracle/Data-Science-Project-2-2021-2-Nowcasting.git

# Download Images

In [None]:
!gdown --id 1g9EgBESpAHITN6USegHQI5pV6elEb_u2
!unzip bkk_radar_images_dBZ_1.zip
!rm bkk_radar_images_dBZ_1.zip

In [None]:
!mkdir /content/Data-Science-Project-2-2021-2-Nowcasting/data/bkk_radar_images_dBZ
!mv /content/bkk_radar_images_dBZ_1 /content/Data-Science-Project-2-2021-2-Nowcasting/data/bkk_radar_images_dBZ

# Spark Preparation

In [None]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

In [None]:
if IN_COLAB:
  !apt-get install openjdk-8-jdk-headless -qq > /dev/null
  !wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
  !tar xf spark-3.2.1-bin-hadoop3.2.tgz
  !mv spark-3.2.1-bin-hadoop3.2 spark
  !pip install -q findspark

In [None]:
if IN_COLAB:
  import os
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark"

## Start a Local Cluster

In [None]:
import findspark
findspark.init()

In [None]:
cluster_url = 'local'

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master(cluster_url)\
        .appName("Colab")\
        .config('spark.ui.port', '4040')\
        .getOrCreate()
        
sc = spark.sparkContext

# Import library and Dataloader

In [None]:
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib as mpl
import matplotlib.pyplot as plt
# from scipy.spatial.distance import mahalanobis

In [None]:
import sys, os
sys.path.insert(0,'/content/Data-Science-Project-2-2021-2-Nowcasting')

In [None]:
from utils.config import cfg
from utils.tools import image

In [None]:
class Dataloader():
    def __init__(self, pd_path):
        self.data = None
        self.df = pd.read_pickle(pd_path)
        self.df['FolderPath'] = '/content/Data-Science-Project-2-2021-2-Nowcasting/data/bkk_radar_images_dBZ/bkk_radar_images_dBZ_1/'
    
    def _load(self):
        self.df['img_path'] = self.df['FolderPath'] + self.df['FileName']
        self.data = image.quick_read_frames(path_list=self.df['img_path'].tolist()[:100], grayscale=True)
        self.data = self.data.reshape(self.data.shape[0],-1)

In [None]:
pd_path = cfg.ONM_PD.FOLDER_1
dl = Dataloader(pd_path)
dl._load()

In [None]:
# dl.data = dl.data.swapaxes(0,1)
dl.data.shape

# RDD Operations

In [None]:
data = [list(e) for e in dl.data.tolist()]

In [None]:
del dl

In [None]:
rdd = sc.parallelize(data)

In [None]:
del data

### flatMap: (location, pixel_value, count)

In [None]:
rdd2 = rdd.flatMap(lambda x: list(zip(range(len(x)), x, [1]*len(x))))

In [None]:
# rdd2.take(5)

### filter: keep pixel_value > 0

In [None]:
rdd3 = rdd2.filter(lambda x: x[1] > 0)

In [None]:
# rdd3.take(5)

### map: create key {location + "_" + pixel_value}

In [None]:
def createKey(x):
  key = str(x[0]) + "_" + str(x[1])
  return (key, x[2])

In [None]:
rdd4 = rdd3.map(lambda x: createKey(x))

In [None]:
# rdd4.take(5)

### reduceByKey: 

In [None]:
rdd5 = rdd4.reduceByKey(lambda x, y: x+y)

In [None]:
# rdd5.take(5)

### map: deMergeKey {pixel_value}

In [None]:
def deMergeKey(x, keep_loc=False):
  loc, px = x[0].split('_')
  if keep_loc:
    return (loc, px, x[1])
  return (px, x[1])

In [None]:
rdd6_loc = rdd5.map(lambda x: deMergeKey(x, keep_loc=True))

In [None]:
rdd6 = rdd5.map(lambda x: deMergeKey(x))

In [None]:
# rdd6.take(3)

### reduceByKey:

In [None]:
rdd7 = rdd6.reduceByKey(lambda x, y: x+y)

In [None]:
# rdd7.take(5)

### Create Features

In [None]:
raw_tuple = [int(e[0]), e[1]) for e in rdd7.collect()]
raw_tuple

In [None]:
raw_tuple = np.array(list(raw_tuple))

In [None]:
raw_tuple

In [None]:
raw_tuple = np.save("raw_tuple.npy", raw_tuple)

In [None]:
# features = np.zeros((255,))
# for key, val in raw_tuple:
#   features[key] = key*val

In [None]:
raw_tuple_loc = [(int(e[0])%2034, int(e[0])//2034, int(e[1]), e[2]) for e in rdd6_loc.collect()]

In [None]:
raw_tuple_loc = np.array(list(raw_tuple_loc))

In [None]:
raw_tuple_loc = np.save("raw_tuple_loc.npy", raw_tuple_loc)

In [None]:
# features_wloc = np.zeros((2034,2048,255))

In [None]:
# for loc_x, loc_y, key, val in raw_tuple_loc:
#   features_wloc[loc_x][loc_y][key] = val

In [None]:
# del raw_tuple_loc
# del features_wloc

In [None]:
# mu = np.sum(features)/(2034*2048)

In [None]:
# S = (features_wloc - mu) @ (features_wloc - mu).T