In [2]:
import pyspark
from pyspark.sql import SparkSession

In [4]:
from pyspark.sql import SparkSession

# Criando uma sessão do Spark
spark = SparkSession.builder.appName("PySpark Interview Test").getOrCreate()

# Dados fornecidos
data = [
    ("Alice", 34, "Data Scientist"),
    ("Bob", 45, "Data Engineer"),
    ("Cathy", 29, "Data Analyst"),
    ("David", 35, "Data Scientist")
]
columns = ["Name", "Age", "Occupation"]

# Criando o DataFrame
df = spark.createDataFrame(data, schema=columns)

# Mostrando o DataFrame
df.show()


+-----+---+--------------+
| Name|Age|    Occupation|
+-----+---+--------------+
|Alice| 34|Data Scientist|
|  Bob| 45| Data Engineer|
|Cathy| 29|  Data Analyst|
|David| 35|Data Scientist|
+-----+---+--------------+



In [5]:
# Seleção das colunas "Name" e "Age"
selected_df = df.select("Name", "Age")

# Filtragem onde "Age" é maior que 30
filtered_df = selected_df.filter(selected_df.Age > 30)

# Mostrando o DataFrame filtrado
filtered_df.show()


+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|David| 35|
+-----+---+



In [6]:
from pyspark.sql.functions import avg

# Agrupamento por "Occupation" e cálculo da média de "Age"
grouped_df = df.groupBy("Occupation").agg(avg("Age").alias("Average_Age"))

# Mostrando o DataFrame agrupado
grouped_df.show()


+--------------+-----------+
|    Occupation|Average_Age|
+--------------+-----------+
|Data Scientist|       34.5|
| Data Engineer|       45.0|
|  Data Analyst|       29.0|
+--------------+-----------+



In [7]:
# Ordenando em ordem decrescente pela média de "Age"
sorted_df = grouped_df.orderBy(grouped_df.Average_Age.desc())

# Mostrando o DataFrame ordenado
sorted_df.show()


+--------------+-----------+
|    Occupation|Average_Age|
+--------------+-----------+
| Data Engineer|       45.0|
|Data Scientist|       34.5|
|  Data Analyst|       29.0|
+--------------+-----------+



In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType




In [6]:
# Função Python para categorizar as idades
def categorize_age(age):
    if age < 30:
        return "Jovem"
    elif 30 <= age <= 40:
        return "Adulto"
    else:
        return "Senior"

# Convertendo a função em UDF
categorize_age_udf = udf(categorize_age, StringType())

# Aplicando a UDF ao DataFrame
df_with_category = df.withColumn("Age_Category", categorize_age_udf(df.Age))

# Mostrando o DataFrame com a nova coluna
df_with_category.show()

+-----+---+--------------+------------+
| Name|Age|    Occupation|Age_Category|
+-----+---+--------------+------------+
|Alice| 34|Data Scientist|      Adulto|
|  Bob| 45| Data Engineer|      Senior|
|Cathy| 29|  Data Analyst|       Jovem|
|David| 35|Data Scientist|      Adulto|
+-----+---+--------------+------------+



In [9]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, avg

# Definindo a janela por "Occupation"
window_spec = Window.partitionBy("Occupation")

# Calculando a média de "Age" por "Occupation"
df_with_mean_age = df.withColumn("Mean_Age", avg("Age").over(window_spec))

# Calculando a diferença entre a "Age" e a média
df_with_age_diff = df_with_mean_age.withColumn("Age_Difference", col("Age") - col("Mean_Age"))

# Mostrando o DataFrame com a nova coluna
df_with_age_diff.show()


+-----+---+--------------+--------+--------------+
| Name|Age|    Occupation|Mean_Age|Age_Difference|
+-----+---+--------------+--------+--------------+
|Cathy| 29|  Data Analyst|    29.0|           0.0|
|  Bob| 45| Data Engineer|    45.0|           0.0|
|Alice| 34|Data Scientist|    34.5|          -0.5|
|David| 35|Data Scientist|    34.5|           0.5|
+-----+---+--------------+--------+--------------+



In [10]:
from pyspark.sql.functions import broadcast

# Criando outro DataFrame para o join
other_data = [
    ("Data Scientist", "Science"),
    ("Data Engineer", "Engineering"),
    ("Data Analyst", "Analytics")
]
other_columns = ["Occupation", "Department"]

other_df = spark.createDataFrame(other_data, schema=other_columns)

# Realizando o Broadcast Join
joined_df = df.join(broadcast(other_df), "Occupation")

# Mostrando o DataFrame resultante do join
joined_df.show()


+--------------+-----+---+-----------+
|    Occupation| Name|Age| Department|
+--------------+-----+---+-----------+
|Data Scientist|Alice| 34|    Science|
| Data Engineer|  Bob| 45|Engineering|
|  Data Analyst|Cathy| 29|  Analytics|
|Data Scientist|David| 35|    Science|
+--------------+-----+---+-----------+



----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 42672)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/usr/local/spark/python/pyspark/accumulators.py", line 271, in accum_updates
    num_updates =

In [None]:
# Lendo um arquivo CSV
csv_df = spark.read.csv("path/to/csv_file.csv", header=True, inferSchema=True)

# Escrevendo o DataFrame em formato Parquet
csv_df.write.parquet("path/to/output.parquet")


In [None]:
# Lendo um arquivo do HDFS
hdfs_df = spark.read.csv("hdfs://path/to/hdfs_file.csv", header=True, inferSchema=True)

# Salvando o resultado de volta no HDFS
hdfs_df.write.csv("hdfs://path/to/output_dir")


In [None]:
# Carregando o arquivo de log
logs_df = spark.read.csv("path/to/log_file.csv", header=True, inferSchema=True)

# Contando o número de ações por usuário
actions_per_user = logs_df.groupBy("user_id").count()

# Encontrando os 10 usuários mais ativos
top_users = actions_per_user.orderBy("count", ascending=False).limit(10)

# Salvando o resultado em um arquivo CSV
top_users.write.csv("path/to/top_users.csv")
