<a href="https://colab.research.google.com/github/PiotrMaciejKowalski/BigData2022-films/blob/Refactoring-kodu-cleaning-data/colabs/datacleaning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spark Setup and Data Load

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark2.4.5
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
# unzip it
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
# install findspark
!pip install -q findspark
# clone github repo
!git clone https://github.com/PiotrMaciejKowalski/BigData2022-films
# Przeniesienie plików z BigData2022-films do katalogu nadrzędnego
!mv BigData2022-films/* .
!mv BigData2022-films/.* .
!rmdir BigData2022-films

Cloning into 'BigData2022-films'...
remote: Enumerating objects: 1079, done.[K
remote: Counting objects: 100% (393/393), done.[K
remote: Compressing objects: 100% (207/207), done.[K
remote: Total 1079 (delta 273), reused 212 (delta 186), pack-reused 686[K
Receiving objects: 100% (1079/1079), 2.70 MiB | 18.55 MiB/s, done.
Resolving deltas: 100% (635/635), done.
mv: cannot move 'BigData2022-films/.' to './.': Device or resource busy
mv: cannot move 'BigData2022-films/..' to './..': Device or resource busy


In [3]:
!git checkout Refactoring-kodu-cleaning-data

Branch 'Refactoring-kodu-cleaning-data' set up to track remote branch 'Refactoring-kodu-cleaning-data' from 'origin'.
Switched to a new branch 'Refactoring-kodu-cleaning-data'


In [4]:
import os

# setup environment variables for our Spark Session to work
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = '/content/spark-3.2.1-bin-hadoop3.2'

from lib.pyspark_startup import init, load

In [5]:
spark = init()

In [6]:
# Ładowanie danych z dysku google
path = "/content/drive/.shortcut-targets-by-id/1VcOir9FMG8LzEsUE-Q8YA79c_sV0tJwp/bigdata2022/"

df = spark.read.parquet(path+"joined_df.parquet")

# Data cleaning

## Odfiltorwanie odcinków

In [7]:
title_episode = spark.read.csv(
    "/content/drive/.shortcut-targets-by-id/1VcOir9FMG8LzEsUE-Q8YA79c_sV0tJwp/bigdata2022/title.episode.tsv.gz",
    sep="\t",
    header=True,
)

In [8]:
df = df.join(title_episode,df.id==title_episode.tconst,"leftanti")

## Konwersja

In [9]:
from lib.pyspark_preprocesing import convert_types

cols = [
    "liczba_sezonow",
    "liczba_wszystkich_odcinkow",
    "dlugosc_produkcji_w_min",
    "rok_wydania_produkcji",
    "rok_zakonczenia_produkcji",
]

df = convert_types(df, cols, "int")

## Uzupełnienie wartości dla filmów

In [11]:
from lib.pyspark_preprocesing import value_overwrite

df = value_overwrite(
    df,
    ["liczba_sezonow", "liczba_wszystkich_odcinkow", "rok_zakonczenia_produkcji"],
    [1, 1, "rok_wydania_produkcji"],
    ["short", "movie", "video", "tvMovie", "tvMiniSeries", "tvSpecial", "tvShort"],
)

## Usuniecie nulli

In [13]:
df = df.dropna(subset = df.columns[0:10])

## Odfiltrowanie niechcianych rodzajów produkcji

In [14]:
df = df.filter(
    (df.rodzaj_produkcji != "videoGame")
    & (df.rodzaj_produkcji != "tvEpisode")
    & (df.rodzaj_produkcji != "tvPilot")
)

## Usunięcie produkcji z błędnymi typami


In [15]:
title_basics = spark.read.csv(
    "/content/drive/.shortcut-targets-by-id/1VcOir9FMG8LzEsUE-Q8YA79c_sV0tJwp/bigdata2022/title.basics.tsv.gz",
    sep="\t",
    header=True,
)

In [16]:
df1 = title_episode.join(title_basics, title_episode.parentTconst == title_basics.tconst)

In [17]:
df1 = df1.filter(
    (df1.titleType != "tvSeries")
    & (df1.titleType != "tvMiniSeries"))

In [18]:
distinct_ids = [x.parentTconst for x in df1.select('parentTconst').distinct().collect()]

In [19]:
with open('docs/black_list.txt', 'r') as f:
    wrong_titleType = f.readlines()
    wrong_titleType = [line.rstrip() for line in wrong_titleType]

In [20]:
podejrzane_zle = []
for element in distinct_ids:
  if element in wrong_titleType:
    df = df.filter(~df['id'].isin(element))
  else:
    podejrzane_zle.append(element)

## Zapis na dysku

In [23]:
df.write.mode("overwrite").parquet(path+"clean_df.parquet")