In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('miss').getOrCreate()

23/12/23 09:25:17 WARN Utils: Your hostname, Jotunheim.local resolves to a loopback address: 127.0.0.1; using 192.168.100.118 instead (on interface en0)
23/12/23 09:25:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/23 09:25:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
df = spark.read.csv('Python-and-Spark-for-Big-Data-master/Spark_DataFrames/ContainsNull.csv', header=True, inferSchema=True)

In [7]:
df.schema

StructType([StructField('Id', StringType(), True), StructField('Name', StringType(), True), StructField('Sales', DoubleType(), True)])

In [8]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



# Dropando dados faltosos

In [11]:
# Dropar qualquer row que possua pelo menos uma coluna nula
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [13]:
# Dropa qualquer row que tenha 2 ou mais colunas nulas
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [14]:
# Parametro how define quando a row é dropada
# any -> se qualquer coluna for nula
# all -> se todas colunas forem nulas
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [15]:
# Parametro subset define as colunas que devem ser consideradas
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



# Preenchendo dados nulos

In [16]:
# Preenche todas as colunas com o tipo correspondente
df.na.fill('Sem Nome').show()

+----+--------+-----+
|  Id|    Name|Sales|
+----+--------+-----+
|emp1|    John| NULL|
|emp2|Sem Nome| NULL|
|emp3|Sem Nome|345.0|
|emp4|   Cindy|456.0|
+----+--------+-----+



In [17]:
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| NULL|  0.0|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [18]:
# Preenche apenas as colunas especificadas
df.na.fill('Sem Nome', subset=['Name']).show()

+----+--------+-----+
|  Id|    Name|Sales|
+----+--------+-----+
|emp1|    John| NULL|
|emp2|Sem Nome| NULL|
|emp3|Sem Nome|345.0|
|emp4|   Cindy|456.0|
+----+--------+-----+



In [39]:
from pyspark.sql.functions import mean

In [40]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [41]:
# Calcular média da coluna
mean_value = df.select(mean('Sales')).collect()
mean_value

[Row(avg(Sales)=400.5)]

In [43]:
# Preencher nulos com valor médio
df.na.fill(mean_value[0][0], subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| NULL|400.5|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

