<a href="https://colab.research.google.com/github/eerjoker/pyspark_practice/blob/main/pyspark_practice_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Leo un csv y filtro



In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("main_app").getOrCreate()
df = spark.read.csv('/content/sample_data/california_housing_test.csv')

# filtro y muestro
df.where(df['_c1'] > 40).select(df['_c0'], df['_c1']).show(5)

+-----------+---------+
|        _c0|      _c1|
+-----------+---------+
|-123.520000|41.010000|
|-122.270000|41.230000|
|-124.160000|41.920000|
|-124.170000|41.800000|
|-122.320000|41.310000|
+-----------+---------+
only showing top 5 rows



# Guardar como parquet

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("main_app").getOrCreate()
df = spark.read.csv('/content/sample_data/california_housing_test.csv')

# guardo parquet
dir = '/content/sample_data/new_files'
df.write.parquet(dir, mode="overwrite")
spark.read.parquet(dir).show()

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|        _c0|      _c1|               _c2|        _c3|           _c4|        _c5|       _c6|          _c7|               _c8|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
|-122.050000|37.370000|         27.000000|3885.000000|    661.000000|1537.000000|606.000000|     6.608500|     344700.000000|
|-118.300000|34.260000|         43.000000|1510.000000|    310.000000| 809.000000|277.000000|     3.599000|     176500.000000|
|-117.810000|33.780000|         27.000000|3589.000000|    507.000000|1484.000000|495.000000|     5.793400|     270500.000000|
|-118.360000|33.820000|         28.000000|  67.000000|     15.000000|  49.000000| 11.000000|     6.135900|     330000.

#Arreglo headers y agrego columna date

In [None]:
from pyspark.sql import SparkSession, functions

spark = SparkSession.builder.appName("main_app").getOrCreate()
df = spark.read.option("header", True).csv('/content/sample_data/california_housing_test.csv')

# nueva columna
df = df.withColumn('fecha_hoy', functions.current_date())

# guardo
dir = '/content/sample_data/new_files'
df.write.parquet(dir, mode="overwrite")
spark.read.parquet(dir).show()

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+----------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value| fecha_hoy|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+----------+
|-122.050000|37.370000|         27.000000|3885.000000|    661.000000|1537.000000| 606.000000|     6.608500|     344700.000000|2025-04-30|
|-118.300000|34.260000|         43.000000|1510.000000|    310.000000| 809.000000| 277.000000|     3.599000|     176500.000000|2025-04-30|
|-117.810000|33.780000|         27.000000|3589.000000|    507.000000|1484.000000| 495.000000|     5.793400|     270500.000000|2025-04-30|
|-118.360000|33.820000|         28.000000|  67.000000|     15.000000|  49.000000|  11.000000|     6.135900|     330000.000000|2025-04-30|
|-119.670000|36.330000|         19

#Select, fecha ayer y ultima fecha habil

In [None]:
from pyspark.sql import SparkSession, functions

spark = SparkSession.builder.appName("main_app").getOrCreate()
df = spark.read.option("header", True).csv('/content/sample_data/california_housing_test.csv')


# nuevas columnas
df = df.withColumn('fecha_hoy', functions.current_date())
df = df.withColumn('fecha_ayer',  functions.date_add(df['fecha_hoy'], -1)) \
  .withColumn('ultima_fecha_habil', functions.when(functions.dayofweek(df['fecha_hoy']) == 1, functions.date_add(df['fecha_hoy'], -2))
                                  .when(functions.dayofweek(df['fecha_hoy']) == 2, functions.date_add(df['fecha_hoy'], -3))
                                  .otherwise(functions.date_add(df['fecha_hoy'], -1)))

#creo tabla temp
df.createOrReplaceTempView('calif_housing_table')

#select
spark.sql("select * from calif_housing_table where latitude < 35").show()

+-----------+---------+------------------+------------+--------------+-----------+-----------+-------------+------------------+----------+----------+------------------+
|  longitude| latitude|housing_median_age| total_rooms|total_bedrooms| population| households|median_income|median_house_value| fecha_hoy|fecha_ayer|ultima_fecha_habil|
+-----------+---------+------------------+------------+--------------+-----------+-----------+-------------+------------------+----------+----------+------------------+
|-118.300000|34.260000|         43.000000| 1510.000000|    310.000000| 809.000000| 277.000000|     3.599000|     176500.000000|2025-05-01|2025-04-30|        2025-04-30|
|-117.810000|33.780000|         27.000000| 3589.000000|    507.000000|1484.000000| 495.000000|     5.793400|     270500.000000|2025-05-01|2025-04-30|        2025-04-30|
|-118.360000|33.820000|         28.000000|   67.000000|     15.000000|  49.000000|  11.000000|     6.135900|     330000.000000|2025-05-01|2025-04-30|      

#Otra forma con fecha ayer en sql

In [None]:
from pyspark.sql import SparkSession, functions

spark = SparkSession.builder.appName("main_app").getOrCreate()
df = spark.read.option("header", True).csv('/content/sample_data/california_housing_test.csv')

# nuevas columnas
df = df.withColumn('fecha_hoy', functions.current_date())

#creo tabla temp
df.createOrReplaceTempView('calif_housing_table')

#select
spark.sql("select fecha_hoy, DATEADD(fecha_hoy, -1) fecha_ayer from calif_housing_table").show()

+----------+----------+
| fecha_hoy|fecha_ayer|
+----------+----------+
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
|2025-05-01|2025-04-30|
+----------+----------+
only showing top 20 rows



# from json to parquet

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("main_app").getOrCreate()
df = spark.read.option("multiLine", True).json('/content/sample_data/anscombe.json')

# guardo
dir = '/content/sample_data/new_files'
df.write.parquet(dir, mode="overwrite")
spark.read.parquet(dir).show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|  NULL|NULL| NULL|              [|
|     I|10.0| 8.04|           NULL|
|     I| 8.0| 6.95|           NULL|
|     I|13.0| 7.58|           NULL|
|     I| 9.0| 8.81|           NULL|
|     I|11.0| 8.33|           NULL|
|     I|14.0| 9.96|           NULL|
|     I| 6.0| 7.24|           NULL|
|     I| 4.0| 4.26|           NULL|
|     I|12.0|10.84|           NULL|
|     I| 7.0| 4.81|           NULL|
|     I| 5.0| 5.68|           NULL|
|    II|10.0| 9.14|           NULL|
|    II| 8.0| 8.14|           NULL|
|    II|13.0| 8.74|           NULL|
|    II| 9.0| 8.77|           NULL|
|    II|11.0| 9.26|           NULL|
|    II|14.0|  8.1|           NULL|
|    II| 6.0| 6.13|           NULL|
|    II| 4.0|  3.1|           NULL|
|    II|12.0| 9.13|           NULL|
|    II| 7.0| 7.26|           NULL|
|    II| 5.0| 4.74|           NULL|
|   III|10.0| 7.46|           NULL|
|   III| 8.0| 6.77|         

#from fixed length to parquet

In [None]:
from pyspark.sql import SparkSession

schema = [
    (0, 8),
    (8, 16),
    (24, 16),
    (40, 12),
    (52, 14),
    (66, 16),
    (82, 7)
]

spark = SparkSession.builder.appName("main_app").getOrCreate()
df = spark.read.text('/content/sample_data/credit_balance_01.fw')

# obtengo los headers
headers = str(df.first()[0])

# completo cada columna separando segun el schema
for colinfo in schema:
  df = df.withColumn(headers[colinfo[0] : colinfo[1] + colinfo[0]], \
                     df.value.substr(colinfo[0], colinfo[1]))

# limpio la fila de headers y la columna de la primer lectura
df = df.filter(~functions.col('value').contains(headers)) \
       .drop('value')

# guardo
dir = '/content/sample_data/new_files'
df.write.parquet(dir, mode="overwrite")
spark.read.parquet(dir).show()

+--------+----------------+----------------+------------+--------------+----------------+-------+
|Account |LastName        |FirstName       |Balance     |CreditLimit   |AccountCreated  |Rating |
+--------+----------------+----------------+------------+--------------+----------------+-------+
|101     | Reeves         | Keanu          | 9315.45    | 10000.00     | 1/17/1998      | A     |
|312     | Butler         | Gerard         | 90.00      | 1000.00      | 8/6/2003       | B     |
|868     | Hewitt         | Jennifer Love  | 0          | 17000.00     | 5/25/1985      | B     |
|761     | Pinkett-Smith  | Jada           | 49654.87   | 100000.00    | 12/5/2006      | A     |
|317     | Murray         | Bill           | 789.65     | 5000.00      | 2/5/2007       | C     |
+--------+----------------+----------------+------------+--------------+----------------+-------+



# from fixed length to parquet with classes and casting

In [None]:
from pyspark.sql import SparkSession, types, functions

class FixedLengthToParquet:
  def __init__(self, schema, inputFileName, outputFileName):
    self.schema = schema
    self.inputFileName = inputFileName
    self.outputFileName = outputFileName

  def run(self):
    spark = SparkSession.builder.appName("main_app").getOrCreate()
    df = spark.read.text(self.inputFileName)

    # obtengo los headers
    headers = str(df.first()[0])

    # completo cada columna separando segun el schema
    for colinfo in schema:
      df = df.withColumn(headers[colinfo[0] : colinfo[1] + colinfo[0]], \
        functions.to_date(functions.trim(df.value.substr(colinfo[0], colinfo[1])), "M/d/yyyy") \
          if isinstance(colinfo[2], types.DateType) \
          else df.value.substr(colinfo[0], colinfo[1]).cast(colinfo[2]))

    # limpio la fila de headers y la columna de la primer lectura
    df = df.filter(~functions.col('value').contains(headers)) \
           .drop('value')

    # guardo
    dir = self.outputFileName
    df.write.parquet(dir, mode="overwrite")
    spark.read.parquet(dir).show()

schema = [
    (0, 8, types.IntegerType()),
    (8, 16, types.StringType()),
    (24, 16, types.StringType()),
    (40, 12, types.DoubleType()),
    (52, 14, types.DoubleType()),
    (66, 16, types.DateType()),
    (82, 7, types.StringType())
]

# main
FixedLengthToParquet(schema, \
                     '/content/sample_data/credit_balance_01.fw', \
                     '/content/sample_data/new_files') \
                     .run()


+--------+----------------+----------------+------------+--------------+----------------+-------+
|Account |LastName        |FirstName       |Balance     |CreditLimit   |AccountCreated  |Rating |
+--------+----------------+----------------+------------+--------------+----------------+-------+
|     101| Reeves         | Keanu          |     9315.45|       10000.0|      1998-01-17| A     |
|     312| Butler         | Gerard         |        90.0|        1000.0|      2003-08-06| B     |
|     868| Hewitt         | Jennifer Love  |         0.0|       17000.0|      1985-05-25| B     |
|     761| Pinkett-Smith  | Jada           |    49654.87|      100000.0|      2006-12-05| A     |
|     317| Murray         | Bill           |      789.65|        5000.0|      2007-02-05| C     |
+--------+----------------+----------------+------------+--------------+----------------+-------+

