## Montamos los elementos básicos y llamamos df al csv: 3 Juegos en steam.csv

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Game Market Analysis") \
    .getOrCreate()

# Load Data into Spark DataFrame
df = spark.read.csv("3 Juegos en steam.csv", header=True, inferSchema=True)


## Revisamos el tipo de cada columna para transformar los datos a conveniencia

In [6]:
df.printSchema()


root
 |-- appid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- english: string (nullable = true)
 |-- developer: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- platforms: string (nullable = true)
 |-- required_age: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- steamspy_tags: string (nullable = true)
 |-- achievements: string (nullable = true)
 |-- positive_ratings: string (nullable = true)
 |-- negative_ratings: integer (nullable = true)
 |-- average_playtime: integer (nullable = true)
 |-- median_playtime: integer (nullable = true)
 |-- owners: string (nullable = true)
 |-- price: string (nullable = true)



### Cambiamos los datos que sean necesarios

In [7]:
# Convert string columns to integer or double types
df = df.withColumn("english", df["english"].cast(IntegerType()))
df = df.withColumn("required_age", df["required_age"].cast(IntegerType()))
df = df.withColumn("achievements", df["achievements"].cast(IntegerType()))
df = df.withColumn("price", df["price"].cast(DoubleType()))

# Confirm the data types have been converted
df.printSchema()

root
 |-- appid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- english: integer (nullable = true)
 |-- developer: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- platforms: string (nullable = true)
 |-- required_age: integer (nullable = true)
 |-- categories: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- steamspy_tags: string (nullable = true)
 |-- achievements: integer (nullable = true)
 |-- positive_ratings: string (nullable = true)
 |-- negative_ratings: integer (nullable = true)
 |-- average_playtime: integer (nullable = true)
 |-- median_playtime: integer (nullable = true)
 |-- owners: string (nullable = true)
 |-- price: double (nullable = true)



In [10]:
df_filtered = df.where(df.positive_ratings > df.negative_ratings)
from pyspark.sql import functions as F

df.groupBy("genres").agg(F.avg("average_playtime").alias("avg_playtime")).show()
df = df.withColumn("release_date", F.to_date(df.release_date, "yyyy-MM-dd"))
df.createOrReplaceTempView("steam_data")
spark.sql("SELECT name, genres FROM steam_data WHERE positive_ratings > 1000").show()




+--------------------+------------------+
|              genres|      avg_playtime|
+--------------------+------------------+
|    Casual;Indie;RPG| 46.78333333333333|
|Adventure;Casual;...|24.490909090909092|
|Adventure;Free to...| 6.666666666666667|
|Design & Illustra...|               0.0|
|Casual;Free to Pl...|               0.0|
|Animation & Model...|               0.0|
|Adventure;Casual;...|               0.0|
|Action;Free to Pl...|               0.0|
|Sexual Content;Vi...|              55.0|
|Action;Adventure;...|              14.0|
|Strategy;Indie;Ca...|              91.5|
|Action;Adventure;...|27.805555555555557|
|Casual;Free to Pl...|               0.0|
|Nudity;Violent;Ad...|33.666666666666664|
|       Sports;Casual|               0.0|
|Free to Play;Indi...|              28.0|
|Action;Adventure;...|             138.0|
|Action;Adventure;...|            1273.0|
|Adventure;Casual;...|               0.0|
|Action;Adventure;...|               0.0|
+--------------------+------------

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Game Market Analysis with ML") \
    .getOrCreate()

# Load Data into Spark DataFrame
df = spark.read.csv("3 Juegos en steam.csv", header=True, inferSchema=True)

# Cast problematic columns to the appropriate numerical types
df = df.withColumn("positive_ratings", df["positive_ratings"].cast(IntegerType()))
df = df.withColumn("achievements", df["achievements"].cast(IntegerType()))
df = df.withColumn("price", df["price"].cast(DoubleType()))

# Handle null values by dropping rows
df = df.na.drop()

# Show initial data and schema
df.show(5)
df.printSchema()

# Explode genres into multiple rows
df_exploded = df.withColumn("genres", F.explode(F.split("genres", ";")))

# Analyze Most Popular Genres
df_exploded.groupBy("genres").agg(
    F.sum("positive_ratings").alias("total_positive_ratings")
).orderBy("total_positive_ratings", ascending=False).show(10)

# Analyze Average Price by Genre
df_exploded.groupBy("genres").agg(
    F.avg("price").alias("average_price")
).orderBy("average_price", ascending=False).show()

# Analyze Average Playtime by Genre
df_exploded.groupBy("genres").agg(
    F.avg("average_playtime").alias("average_playtime")
).orderBy("average_playtime", ascending=False).show()

# Feature Engineering
feature_columns = ['positive_ratings', 'negative_ratings', 'average_playtime', 'median_playtime', 'achievements']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Train/Test Split
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Machine Learning Model
lr = LinearRegression(featuresCol='features', labelCol='price')
pipeline = Pipeline(stages=[assembler, lr])

# Train the model
model = pipeline.fit(train_data)

# Evaluate the model
test_results = model.transform(test_data)
test_results.select("prediction", "price", "features").show(5)

# Stop Spark Session
spark.stop()


+-----+--------------------+------------+-------+----------------+---------+-----------------+------------+--------------------+------+--------------------+------------+----------------+----------------+----------------+---------------+-----------------+-----+
|appid|                name|release_date|english|       developer|publisher|        platforms|required_age|          categories|genres|       steamspy_tags|achievements|positive_ratings|negative_ratings|average_playtime|median_playtime|           owners|price|
+-----+--------------------+------------+-------+----------------+---------+-----------------+------------+--------------------+------+--------------------+------------+----------------+----------------+----------------+---------------+-----------------+-----+
|   10|      Counter-Strike|  2000-11-01|      1|           Valve|    Valve|windows;mac;linux|           0|Multi-player;Onli...|Action|Action;FPS;Multip...|           0|          124534|            3339|           176