In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
credentials_location = '/home/abdelali/.gc/de-zoomcamp-project-449906-76a2dac480f2.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

25/02/05 09:32:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [4]:
files = ["appearances", "club_games", "clubs", "competitions", "game_events", 
         "game_lineups", "games", "player_valuations", "players", "transfers"]

dfs = {
    file: spark.read \
           .option('header', 'true') \
           .option("inferSchema", True) \
           .csv(f'gs://de-zoomcamp-project-449906_bucket/raw/{file}.csv') 
    for file in files
}

                                                                                

In [5]:
for df in dfs.values():
    df.printSchema()

root
 |-- appearance_id: string (nullable = true)
 |-- game_id: integer (nullable = true)
 |-- player_id: integer (nullable = true)
 |-- player_club_id: integer (nullable = true)
 |-- player_current_club_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- player_name: string (nullable = true)
 |-- competition_id: string (nullable = true)
 |-- yellow_cards: integer (nullable = true)
 |-- red_cards: integer (nullable = true)
 |-- goals: integer (nullable = true)
 |-- assists: integer (nullable = true)
 |-- minutes_played: integer (nullable = true)

root
 |-- game_id: integer (nullable = true)
 |-- club_id: integer (nullable = true)
 |-- own_goals: integer (nullable = true)
 |-- own_position: integer (nullable = true)
 |-- own_manager_name: string (nullable = true)
 |-- opponent_id: integer (nullable = true)
 |-- opponent_goals: integer (nullable = true)
 |-- opponent_position: integer (nullable = true)
 |-- opponent_manager_name: string (nullable = true)
 |-- hosting: st

In [6]:
for df in dfs.values():
    df.show(5)

+--------------+-------+---------+--------------+----------------------+----------+----------------+--------------+------------+---------+-----+-------+--------------+
| appearance_id|game_id|player_id|player_club_id|player_current_club_id|      date|     player_name|competition_id|yellow_cards|red_cards|goals|assists|minutes_played|
+--------------+-------+---------+--------------+----------------------+----------+----------------+--------------+------------+---------+-----+-------+--------------+
| 2231978_38004|2231978|    38004|           853|                   235|2012-07-03|Aurélien Joachim|           CLQ|           0|        0|    2|      0|            90|
| 2233748_79232|2233748|    79232|          8841|                  2698|2012-07-05|  Ruslan Abyshov|           ELQ|           0|        0|    0|      0|            90|
| 2234413_42792|2234413|    42792|          6251|                   465|2012-07-05|     Sander Puri|           ELQ|           0|        0|    0|      0|        

In [7]:
for file in dfs:
    print(f"{file}: {dfs[file].rdd.getNumPartitions()} partitions.")

appearances: 4 partitions.
club_games: 3 partitions.
clubs: 1 partitions.
competitions: 1 partitions.
game_events: 4 partitions.
game_lineups: 4 partitions.
games: 4 partitions.
player_valuations: 4 partitions.
players: 3 partitions.
transfers: 2 partitions.


In [8]:
for file in dfs.keys():
    output_path = f'gs://de-zoomcamp-project-449906_bucket/pq/{file}/'
    # we'll use coalesce on small files that spark don't handle properly
    if file in ['club_games', 'players', 'games', 'player_valuations', 'transfers']:
        dfs[file].coalesce(1).write.parquet(output_path, mode='overwrite')
    else:
        dfs[file].write.parquet(output_path, mode='overwrite')

                                                                                