### Importamos los modulos

In [1]:
separador="="*120

In [2]:
import findspark # Este módulo proporciona una función para localizar la instalación de Spark en el sistema.
findspark.init() # Inicializa el entorno de Spark.
findspark.find() # Devuelve la ruta al directorio de Spark. 

'C:\\Users\\guti_\\anaconda3\\envs\\pyspark-env\\Lib\\site-packages\\pyspark'

In [3]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc=SparkContext.getOrCreate()
spark=SparkSession(sc)

In [4]:
sc

### Setup

In [5]:
# Ruta del archivo
location="celulares_PySpark.csv"

In [6]:
# CSV options
file_type = "csv" # Tipo de archivo
infer_schema = "true" # Esta opción indica si se debe intentar inferir automáticamente el esquema de los datos.
first_row_is_header = "true" # Indica si la primera fila del archivo CSV contiene encabezados.
delimiter = "," # Especifica el delimitador utilizado en el archivo CSV.

### Generamos un DataFrame

In [7]:
from pyspark.sql.functions import col

# Leer datos de un archivo CSV a un DataFrame
# Las opciones aplicadas son para archivos CSV. Para otros tipos de archivos, estos serán ignorados.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(location)

### Generamos un RDD

In [8]:
# Cargamos el RDD
lines=sc.textFile(location)
lines

celulares_PySpark.csv MapPartitionsRDD[11] at textFile at <unknown>:0

In [9]:
# Verificamos la cantidad de lineas que posee
lines.count()

870

In [10]:
# Visualizamos la primera linea
lines.first()

'modelo,precio,rating,sim,procesador,bateria,memoria,os,5G,NFC,carga_rapida,almacenamiento,Memory Card,ram,rom'

#### Transformación

Vamos a crear un nuevo RDD y le vamos a decir que esto es un remuestreo del 25 % de los casos que tenemos en el original sin reemplazo. 

In [11]:
# Muestrea el 25% de los elementos sin repeticiones
lines2 = lines.sample(fraction=0.25, withReplacement=False)
lines2

PythonRDD[14] at RDD at PythonRDD.scala:53

In [12]:
# Verificamos la cantidad de lineas que posee
lines2.count()

190

In [13]:
# Visualizamos la primera linea
lines2.first()

'OnePlus Nord CE 2 Lite 5G,239.0,81,Dual Sim,Snapdragon 695,5000,Hybrid,Android v12,true,false,true,1000,true,6,128'

#### Lambda

Vamos a utilizar una función lambda para poder determinar la cantidad de lineas en donde aparece la palabra "Android" en el RDD

In [14]:
resultado=lines.filter(lambda line: "Android" in line)

In [15]:
resultado

PythonRDD[17] at RDD at PythonRDD.scala:53

##### Método con RDD

In [16]:
# Cantidad de veces que se repite la palabra "Android" en el RDD
conteo_rdd=resultado.count()
print(separador)
# Mostrar el resultado
print(f"La palabra 'Android' se repite en {conteo_rdd} lineas.")
print(separador)

La palabra 'Android' se repite en 642 lineas.


In [17]:
# Visualizamos las primeras dos filas en donde aparece la palabra 'Android'
resultado.take(2)

['OnePlus 11 5G,657.59,89,Dual Sim,Snapdragon 8 Gen2,5000,Not Supported,Android v13,true,true,true,0,true,12,256',
 'OnePlus Nord CE 2 Lite 5G,239.0,81,Dual Sim,Snapdragon 695,5000,Hybrid,Android v12,true,false,true,1000,true,6,128']

##### Método con DataFrame

In [18]:
# Inicializar el contador
conteo_total = 0

# Iterar sobre todas las columnas del DataFrame
for columna in df.columns:
    # Contar las ocurrencias de la palabra "Android" en la columna actual
    conteo_columna = df.filter(col(columna).contains("Android")).count()
    # Actualizar el contador total
    conteo_total += conteo_columna
print(separador)
# Mostrar el resultado
print(f"La palabra 'Android' se repite {conteo_total} veces.")
print(separador)

La palabra 'Android' se repite 642 veces.


In [19]:
# Visualizamos la primer fila en donde aparece la palabra 'Android'
# Inicializar una lista para almacenar las filas que contienen la palabra "Android"
filas_android = []

# Iterar sobre todas las columnas del DataFrame
for columna in df.columns:
    # Filtrar las filas que contienen la palabra "Android" en la columna actual
    filas_filtradas = df.filter(col(columna).contains("Android")).head(2)  # Obtener las primeras dos filas
    # Extender la lista de filas con las filas filtradas
    filas_android.extend(filas_filtradas)
                        
print(separador)
# Mostrar las filas obtenidas
for fila in filas_android:
    print(fila)
print(separador)

Row(modelo='OnePlus 11 5G', precio=657.59, rating=89, sim='Dual Sim', procesador='Snapdragon 8 Gen2', bateria=5000, memoria='Not Supported', os='Android v13', 5G=True, NFC=True, carga_rapida=True, almacenamiento=0, Memory Card=True, ram=12, rom=256)
Row(modelo='OnePlus Nord CE 2 Lite 5G', precio=239.0, rating=81, sim='Dual Sim', procesador='Snapdragon 695', bateria=5000, memoria='Hybrid', os='Android v12', 5G=True, NFC=False, carga_rapida=True, almacenamiento=1000, Memory Card=True, ram=6, rom=128)


#### Collect

Utilizamos collect para mostrar las lineas en donde aparece la palabra 'EMUI'

In [20]:
lineas_EMUI = lines.filter(lambda linea: "EMUI" in linea)
print(separador)

for linea in lineas_EMUI.collect():
    print(linea)
    
print(separador)

Huawei Nova Y61,262.92,63,Dual Sim,Octa Core Processor,5000,Hybrid,EMUI v12,false,false,true,0,true,4,64
Huawei Nova Y90,274.88,82,Dual Sim,Snapdragon  680,5000,Supported,EMUI v12,false,true,true,0,true,8,128


#### Distinct

In [21]:
primer_fila = lines.first()
primer_fila

'modelo,precio,rating,sim,procesador,bateria,memoria,os,5G,NFC,carga_rapida,almacenamiento,Memory Card,ram,rom'

Ahora vamos a filtrar la primer linea del RDD (es el nombre de las columnas del csv)

In [22]:
# Separamos por coma
campos_primera_fila = primer_fila.split(",")
campos_primera_fila

['modelo',
 'precio',
 'rating',
 'sim',
 'procesador',
 'bateria',
 'memoria',
 'os',
 '5G',
 'NFC',
 'carga_rapida',
 'almacenamiento',
 'Memory Card',
 'ram',
 'rom']

In [23]:
# Definir una función para filtrar la primera línea
def filtrar_primera_linea(linea, primera_linea):
    return linea != primera_linea

# Ignorar la primera línea del RDD
lines3 = lines.filter(lambda linea: filtrar_primera_linea(linea, primer_fila))
lines3

PythonRDD[187] at RDD at PythonRDD.scala:53

In [24]:
# Verificamos la cantidad de lineas que posee
lines3.count()

869

In [25]:
# Guardamos la transformación
lines3.persist()

PythonRDD[187] at RDD at PythonRDD.scala:53

In [26]:
# Visualizamos las primeras 5 lineas del RDD filtrado
lines3.take(5)

['OnePlus 11 5G,657.59,89,Dual Sim,Snapdragon 8 Gen2,5000,Not Supported,Android v13,true,true,true,0,true,12,256',
 'OnePlus Nord CE 2 Lite 5G,239.0,81,Dual Sim,Snapdragon 695,5000,Hybrid,Android v12,true,false,true,1000,true,6,128',
 'Samsung Galaxy A14 5G,197.27,75,Dual Sim,Exynos 1330,5000,Supported,Android v13,true,false,true,1000,true,4,64',
 'Motorola Moto G62 5G,179.33,81,Dual Sim,Snapdragon  695,5000,Hybrid,Android v12,true,false,true,1000,true,6,128',
 'Realme 10 Pro Plus,298.9,82,Dual Sim,Dimensity 1080,5000,Not Supported,Android v13,true,false,true,0,true,6,128']

In [27]:
# Extraemos la información sobre la ram
ram_rdd = lines3.map(lambda linea: linea.split(",")[13])
ram_rdd

PythonRDD[190] at RDD at PythonRDD.scala:53

In [28]:
# Aplicamos la funcion distinct() junto con collect() para poder visualizar los distintos tamaños de RAM
ram_rdd.distinct().collect()

['6', '4', '3', '16', '18', '12', '8', '2', '512']

#### Función numérica con RDD

In [29]:
# Extraemos la información sobre los precios
precios_rdd = lines3.map(lambda linea: float(linea.split(",")[1]))
precios_rdd

PythonRDD[196] at RDD at PythonRDD.scala:53

##### Precio mínimo

In [30]:
precio_min=round(precios_rdd.min(),2)
print(separador)
print(f"El precio mínimo de los celulares es: {precio_min} USD.")
print(separador)

El precio mínimo de los celulares es: 41.84 USD.


##### Precio promedio

In [31]:
precio_prom=round(precios_rdd.mean(),2)
print(separador)
print(f"El precio promedio de los celulares es: {precio_prom} USD.")
print(separador)

El precio promedio de los celulares es: 348.9 USD.


##### Precio máximo

In [32]:
precio_max=round(precios_rdd.max(),2)
print(separador)
print(f"El precio máximo de los celulares es: {precio_max} USD.")
print(separador)

El precio máximo de los celulares es: 7771.66 USD.


In [33]:
# Cerramos la sesion de Spark
sc.stop()