<a href="https://colab.research.google.com/github/PiotrMaciejKowalski/BigData2022-actors/blob/poprawa_danych/colabs/Zmiana_typu_danych.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Setup sparka

In [None]:
!pip install pyspark py4j
!pip install -q findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget ftp://ftp.ps.pl/pub/apache/spark/spark-3.3.1/spark-3.3.1-bin-hadoop2.tgz
!tar xf spark-3.3.1-bin-hadoop2.tgz

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 38 kB/s 
[?25hCollecting py4j
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[K     |████████████████████████████████| 200 kB 58.6 MB/s 
[?25h  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 56.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark


In [None]:
import pyspark
import findspark
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import split, col
from pyspark.sql.types import StructType, StringType, IntegerType, BooleanType, FloatType, TimestampType, DateType, ArrayType, MapType
from typing import List, Tuple, Dict, Any
from pyspark.sql.types import StructType, StringType
from pyspark.sql.functions import monotonically_increasing_id 
import numpy

In [None]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop2"
spark=SparkSession.builder.appName('Colab').getOrCreate()
spark

#Pobranie danych

##Import danych

In [None]:
!wget https://datasets.imdbws.com/name.basics.tsv.gz
!wget https://datasets.imdbws.com/title.akas.tsv.gz
!wget https://datasets.imdbws.com/title.basics.tsv.gz
!wget https://datasets.imdbws.com/title.crew.tsv.gz
!wget https://datasets.imdbws.com/title.episode.tsv.gz
!wget https://datasets.imdbws.com/title.principals.tsv.gz
!wget https://datasets.imdbws.com/title.ratings.tsv.gz

##Rozpakowanie danych

In [None]:
!gzip -dc /content/name.basics.tsv.gz > name.basics.csv
!gzip -dc /content/title.akas.tsv.gz > title.akas.csv
!gzip -dc /content/title.basics.tsv.gz > title.basics.csv
!gzip -dc /content/title.crew.tsv.gz > title.crew.csv
!gzip -dc /content/title.episode.tsv.gz > title.episode.csv
!gzip -dc /content/title.principals.tsv.gz > title.principals.csv
!gzip -dc /content/title.ratings.tsv.gz > title.ratings.csv

#Wczytanie danych

##Ustalenie typów danych

In [None]:
map_types = {
    str : StringType(),
    int : IntegerType(),
    bool : BooleanType(),
    float: FloatType(),
    'timestamp' : TimestampType(),
    'date' : DateType(),
    List[str] : ArrayType(StringType()),
    Tuple[str] : ArrayType(StringType()),
    Dict[str, str] : MapType(StringType(), StringType())
}

In [None]:
column_conf = {
  'akas'  :  ['titleId','ordering','title','region','language','types','attributes','isOriginalTitle '], 
  'title_basics' : ['tconst','titleType','primaryTitle','originalTitle','isAdult','startYear','endYear','runtimeMinutes','genres'],
  'crew' : ['tconst','directors','writers'],
  'episode' : ['tconst','parentTconst','seasonNumber','episodeNumber'],
  'principals' : ['tconst','ordering','nconst','category','job','characters'],
  'ratings' : ['tconst','averageRating','numVotes'],
  'name_basics' : ['nconst','primaryName','birthYear','deathYear','primaryProfession','knownForTitles']
}
column_type_collection = {
    int : ['ordering', 'startYear', 'endYear', 'runtimeMinutes', 'seasonNumber', 'episodeNumber', 'numVotes', 'birthYear', 'deathYear' ],
    str : ['titleId', 'title', 'region', 'language', 'types', 'attributes', 'tconst', 'titleType', 'primaryTitle', 'originalTitle', 'genres', 'directors', 'writers', 'parentTconst', 'category', 'job', 'characters', 'primaryName', 'primaryProfession', 'knownForTitles'],
    bool : ['isOriginalTitle', 'isAdult' ],
    float : ['averageRating']
}

In [None]:
def init_schema(conf, column_type_collection):
  map = {}
  for pole in conf:
    for python_type, column_list in column_type_collection.items():
      if pole in column_list:
        map[pole] = map_types[python_type]
  schemat= StructType()
  for pole, typ in map.items():
    schemat = schemat.add(pole, typ, True)
  return schemat

In [None]:
Schematy=[schemat_title_akas, schemat_title_basics, schemat_title_crew, schemat_title_episode, schemat_title_principals,
schemat_title_ratings, schemat_name_basics] = [ 
init_schema(column_conf[table], column_type_collection) 
  for table in ('akas', 'title_basics', 'crew', 'episode', 'principals', 'ratings', 'name_basics')]

##Wczytajmy dane z rozpakowanych plików

In [None]:
CSV=['title.akas.csv', 'title.basics.csv', 'title.crew.csv','title.episode.csv', 'title.principals.csv',
'title.ratings.csv', 'name.basics.csv']

In [None]:
def upload(schemat, csv):
  df=spark.read.option("header","true").option("delimiter", "\t").schema(schemat).csv(csv)
  return df

In [None]:
[df_title_akas, df_title_basics, df_title_crew, df_title_episode, df_title_principals,
df_title_ratings, df_name_basics]=[
upload(Schematy[i], CSV[i])
  for i in range(7) 
]

##Wyświetlmy dane

In [None]:
df_name_basics.show(3)
df_title_akas.show(3)
df_title_basics.show(3)
df_title_crew.show(3)
df_title_episode.show(3)
df_title_principals.show(3)
df_title_ratings.show(3)

#Zamiana string na array(string)

Zmienimy następujące kolumny: z tabeli df_title_akas kolumny attributes, types;
z tabeli df_title_basics kolumna genres;
z tabeli df_name_basics kolumny primaryProfession, knownForTitles;
z tabeli df_title_crew kolumny directors, writers.


##Stworzenie nowych, odrębnych kolumn typu array, z kolumn typu string określonych wyżej

In [None]:
df_title_akas_types= df_title_akas.select(split(col("types"),",").alias("types"))
df_title_akas_attributes= df_title_akas.select(split(col("attributes"),",").alias("attributes"))
df_name_basics_primaryProfession= df_name_basics.select(split(col("primaryProfession"),",").alias("primaryProfession"))
df_name_basics_knownForTitles= df_name_basics.select(split(col("knownForTitles"),",").alias("knownForTitles"))
df_title_basics_genres=df_title_basics.select(split(col("genres"),",").alias("genres"))
df_title_crew_directors=df_title_crew.select(split(col("directors"),",").alias("directors"))
df_title_crew_writers=df_title_crew.select(split(col("writers"),",").alias("writers"))

##Zamieńmy pierwotne kolumny na nowe

###usuńmy pierwotne kolumny z niewłaściwym typem danych

In [None]:
df_title_akas= df_title_akas.drop("types")
df_title_akas= df_title_akas.drop("attributes")
df_name_basics= df_name_basics.drop("primaryProfession")
df_name_basics= df_name_basics.drop("knownForTitles")
df_title_basics=df_title_basics.drop("genres")
df_title_crew=df_title_crew.drop("directors")
df_title_crew=df_title_crew.drop("writers")

###dodajmy kolumny ideksów do nowych (jednokolumnowych) tabel i tabel pierwotnych

dodajmy kolumny indeksów do bazowych tabel

In [None]:
df_title_akas=df_title_akas.select("*").withColumn("id", monotonically_increasing_id())
df_name_basics=df_name_basics.select("*").withColumn("id", monotonically_increasing_id())
df_title_basics=df_title_basics.select("*").withColumn("id", monotonically_increasing_id())
df_title_crew= df_title_crew.select("*").withColumn("id", monotonically_increasing_id())

i do nowych, jednokolumnowych tabel

In [None]:
df_title_akas_types=df_title_akas_types.select("*").withColumn("id1", monotonically_increasing_id())
df_title_akas_attributes=df_title_akas_attributes.select("*").withColumn("id2", monotonically_increasing_id())
df_name_basics_primaryProfession=df_name_basics_primaryProfession.select("*").withColumn("id3", monotonically_increasing_id())
df_name_basics_knownForTitles=df_name_basics_knownForTitles.select("*").withColumn("id4", monotonically_increasing_id())
df_title_basics_genres=df_title_basics_genres.select("*").withColumn("id5", monotonically_increasing_id())
df_title_crew_directors=df_title_crew_directors.select("*").withColumn("id6", monotonically_increasing_id())
df_title_crew_writers=df_title_crew_writers.select("*").withColumn("id7", monotonically_increasing_id())

###połączmy je odpowiednio na bazie numerów indeksu

In [None]:
df_title_akas= df_title_akas.join(df_title_akas_types, col('id') == col('id1'), 'leftouter')
df_title_akas= df_title_akas.join(df_title_akas_attributes, col('id') == col('id2'), 'leftouter')
df_name_basics= df_name_basics.join(df_name_basics_primaryProfession, col('id') == col('id3'), 'leftouter')
df_name_basics= df_name_basics.join(df_name_basics_knownForTitles, col('id') == col('id4'), 'leftouter')
df_title_basics=df_title_basics.join(df_title_basics_genres, col('id') == col('id5'), 'leftouter')
df_title_crew= df_title_crew.join(df_title_crew_directors, col('id') == col('id6'), 'leftouter')
df_title_crew= df_title_crew.join(df_title_crew_writers, col('id') == col('id7'), 'leftouter')

###usuńmy kolumny indeksów

In [None]:
df_title_akas=df_title_akas.drop("id").drop("id1").drop("id2")
df_name_basics=df_name_basics.drop("id").drop("id3").drop("id4")
df_title_basics=df_title_basics.drop("id").drop("id5")
df_title_crew=df_title_crew.drop("id").drop("id6").drop("id7")

##Wyświetlmy dane

In [None]:
df_name_basics.show(3)
df_title_akas.show(3)
df_title_basics.show(3)
df_title_crew.show(3)
df_title_episode.show(3)
df_title_principals.show(3)
df_title_ratings.show(3)

Wyświetlmy typy danych

In [None]:
df_name_basics.printSchema()
df_title_akas.printSchema()
df_title_basics.printSchema()
df_title_crew.printSchema()
df_title_episode.printSchema()
df_title_principals.printSchema()
df_title_ratings.printSchema()

#Usunięcie duplikatów

In [None]:
df_name_basics=df_name_basics.distinct()
df_title_akas=df_title_akas.distinct()
df_title_basics=df_title_basics.distinct()
df_title_crew=df_title_crew.distinct()
df_title_episode=df_title_episode.distinct()
df_title_principals=df_title_principals.distinct()
df_title_ratings=df_title_ratings.distinct()