In [10]:
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *


In [11]:
spark = SparkSession\
    .builder\
    .appName('Lectura de archivos CSV')\
    .getOrCreate()

In [12]:
# Descarga 'vgsales' directamente de kaggle.com
'''
import kagglehub

path = kagglehub.dataset_download("gregorut/videogamesales")

print("Path to dataset files:", path)
'''


'\nimport kagglehub\n\npath = kagglehub.dataset_download("gregorut/videogamesales")\n\nprint("Path to dataset files:", path)\n'

In [13]:
path = '/root/.cache/kagglehub/datasets/gregorut/videogamesales/versions/2'

df = spark.read.csv(path, header=True, inferSchema=True)

In [14]:
df.show(10)

+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|   1|          Wii Sports|     Wii|2006|      Sports| Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|
|   2|   Super Mario Bros.|     NES|1985|    Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|
|   3|      Mario Kart Wii|     Wii|2008|      Racing| Nintendo|   15.85|   12.88|    3.79|       3.31|       35.82|
|   4|   Wii Sports Resort|     Wii|2009|      Sports| Nintendo|   15.75|   11.01|    3.28|       2.96|        33.0|
|   5|Pokemon Red/Pokem...|      GB|1996|Role-Playing| Nintendo|   11.27|    8.89|   10.22|        1.0|       31.37|
|   6|              Tetris|      GB|1989|      Puzzle| Nintendo|

In [15]:
df.count()

16598

In [16]:
# Generacion de Comandos SQL usando spark.sql
df.createOrReplaceTempView('VGSALES')
sql_str = 'select Publisher, sum(NA_Sales), sum(Global_Sales), sum(JP_Sales) from VGSALES group by Genre, Publisher order by Publisher desc'
spark.sql(sql_str).show()

+--------------------+-------------------+------------------+-------------+
|           Publisher|      sum(NA_Sales)| sum(Global_Sales)|sum(JP_Sales)|
+--------------------+-------------------+------------------+-------------+
|        responDESIGN|0.09000000000000001|              0.13|          0.0|
|           mixi, Inc|                0.0|              0.86|         0.86|
|inXile Entertainment|               0.02|               0.1|          0.0|
|     imageepoch Inc.|                0.0|              0.01|         0.01|
|     imageepoch Inc.|                0.0|              0.03|         0.03|
|         id Software|               0.02|              0.03|          0.0|
|                iWin|                0.0|              0.06|          0.0|
|              fonfun|                0.0|              0.02|         0.02|
|     dramatic create|                0.0|               0.1|          0.1|
|     dramatic create|                0.0|              0.01|         0.01|
|   bitCompo

In [17]:
df.createOrReplaceTempView('VGSALES')
sql_str = 'select Publisher, round(sum(NA_Sales),2) as TotalNA, round(sum(Global_Sales),2) as TotalGlobal, round(sum(JP_Sales),2) as TotalJP from VGSALES group by Genre, Publisher order by 3 desc'
spark.sql(sql_str).show(10,80)

+--------------------+-------+-----------+-------+
|           Publisher|TotalNA|TotalGlobal|TotalJP|
+--------------------+-------+-----------+-------+
|     Electronic Arts| 270.27|     479.67|   3.27|
|            Nintendo| 220.14|     427.21| 102.36|
|          Activision| 161.39|     299.87|   4.64|
|            Nintendo| 105.63|      284.9| 102.24|
|            Nintendo|  98.77|     218.01|  35.87|
|Take-Two Interactive| 101.45|     211.08|   4.44|
|            Nintendo|  61.98|     180.67|  55.25|
|     Electronic Arts|  81.15|     158.26|   2.93|
|            Nintendo|  73.55|      151.3|  29.22|
|     Electronic Arts|  75.52|     145.77|    1.2|
+--------------------+-------+-----------+-------+
only showing top 10 rows



In [18]:
from pyspark.sql.types import IntegerType, FloatType

for col_name, data_type in df.dtypes:
  if data_type == 'int': df = df.withColumn(col_name, df[col_name].cast(FloatType()))

In [19]:
spark.sql(sql_str).show(5,40,True)

-RECORD 0----------------------
 Publisher   | Electronic Arts 
 TotalNA     | 270.27          
 TotalGlobal | 479.67          
 TotalJP     | 3.27            
-RECORD 1----------------------
 Publisher   | Nintendo        
 TotalNA     | 220.14          
 TotalGlobal | 427.21          
 TotalJP     | 102.36          
-RECORD 2----------------------
 Publisher   | Activision      
 TotalNA     | 161.39          
 TotalGlobal | 299.87          
 TotalJP     | 4.64            
-RECORD 3----------------------
 Publisher   | Nintendo        
 TotalNA     | 105.63          
 TotalGlobal | 284.9           
 TotalJP     | 102.24          
-RECORD 4----------------------
 Publisher   | Nintendo        
 TotalNA     | 98.77           
 TotalGlobal | 218.01          
 TotalJP     | 35.87           
only showing top 5 rows



In [20]:
spark.sql(sql_str).columns

['Publisher', 'TotalNA', 'TotalGlobal', 'TotalJP']

In [21]:
spark.sql(sql_str).count()

1837

In [22]:
spark.stop()

Particion de datos

In [35]:
# PartitionBy
spark = SparkSession\
    .builder\
    .appName('PartitionBy() PySpark')\
    .getOrCreate()

In [37]:
dataframe = spark.read.option('header',True).csv('/root/.cache/kagglehub/datasets/gregorut/videogamesales/versions/2')

dataframe.printSchema()

root
 |-- Rank: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: string (nullable = true)
 |-- EU_Sales: string (nullable = true)
 |-- JP_Sales: string (nullable = true)
 |-- Other_Sales: string (nullable = true)
 |-- Global_Sales: string (nullable = true)



In [39]:
path = '/content/drive/MyDrive/TAREA BASE DE DATOS/Spark/partition'
dataframe.write.option('header',True).partitionBy('Platform').mode('overwrite').csv(path)

In [40]:
path = '/content/drive/MyDrive/TAREA BASE DE DATOS/Spark/Year'
dataframe.write.option('header',True).partitionBy('Year').mode('overwrite').csv(path)

In [41]:
path = '/content/drive/MyDrive/TAREA BASE DE DATOS/Spark/Year_Genre'
dataframe.write.option('header',True).partitionBy('Year','Genre').mode('overwrite').csv(path)

In [44]:
path = '/content/drive/MyDrive/TAREA BASE DE DATOS/Spark/Pub_Genre'
dataframe.write.option('header',True).partitionBy('Genre','Publisher').mode('overwrite').csv(path)

In [45]:
spark.stop()

Coalesce y Repartition

In [47]:
spark = SparkSession.builder.appName('coalesce() PySpark').getOrCreate()

path = '/root/.cache/kagglehub/datasets/gregorut/videogamesales/versions/2'
dataco = spark.read.option('header',True).csv(path)

dataco.printSchema()

root
 |-- Rank: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: string (nullable = true)
 |-- EU_Sales: string (nullable = true)
 |-- JP_Sales: string (nullable = true)
 |-- Other_Sales: string (nullable = true)
 |-- Global_Sales: string (nullable = true)



In [48]:
dataco.count()

16598

In [51]:
dataco.repartition(20).write.mode('overwrite').option('header',True).csv('/content/drive/MyDrive/TAREA BASE DE DATOS/Spark/repartition')

Coalesce

In [52]:
dataco_2 = dataco.repartition(20)
dataco_2.rdd.getNumPartitions()

20

In [53]:
dataco_3 = dataco_2.coalesce(10)

In [54]:
dataco_3.write.mode('overwrite').option('header',True).csv('/content/drive/MyDrive/TAREA BASE DE DATOS/Spark/coalesce')