**Criando conexão no sistema.**

In [None]:
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
from pyspark.sql import functions as F

# This will create a default Spark session
spark = DataprocSparkSession.builder.getOrCreate()


sc-20251016-133740-gbgefu Dataproc Session is not active, stopping and creating a new one


████████████████████████████████████████████████████████████▊                   







**Lendo tabela da bronze**

In [None]:
df = spark.read.format("bigquery").option("table", "clinic-de.1_bronze.physicians").load()

df.printSchema()



root
 |-- address: struct (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- city: string (nullable = true)
 |-- contacts: struct (nullable = true)
 |    |-- phones: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- email: string (nullable = true)
 |-- specialty: string (nullable = true)
 |-- crm: string (nullable = true)
 |-- name: string (nullable = true)
 |-- physician_id: string (nullable = true)



**Selecionando colunas necessarias do dataframe.**

In [None]:
df_tratado_v1 = (df.select(
    "address.*",
    "contacts.*",
    "specialty",
    "crm",
    "name",
    "physician_id"

    )
)

**Verificando se há nulos nas colunas.**

In [None]:
df_tratado_v1.select([F.sum(F.col(c).isNull().cast("integer")).alias(c) for c in df_tratado_v1.columns]).show()

+-------+-----+----+------+-----+---------+---+----+------------+
|country|state|city|phones|email|specialty|crm|name|physician_id|
+-------+-----+----+------+-----+---------+---+----+------------+
|      0|    0|   0|     0|    0|        0|  0|   0|           0|
+-------+-----+----+------+-----+---------+---+----+------------+



In [None]:
df_tratado_v2 = df_tratado_v1.withColumn("phones", F.col("phones").getItem(0))


**removendo todas as abreviações/títulos terminados em ponto no começo do nome com O regex.**

In [None]:
df_tratado_v3 = (
    df_tratado_v2
    .withColumn(
        "name",
        F.trim(F.regexp_replace(F.col("name"), r"^(?:\s*\S+\.\s+)+", ""))
    )
)


**Salvando dataframe na silver.**

In [None]:
(df_tratado_v3.write
        .format("bigquery")
        .option("table", "clinic-de.2_silver.physicians")
        .option("writeMethod", "direct")
        .mode("overwrite")
        .save()
    )

In [None]:
spark.stop()