In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StringType

import pyspark.sql.functions as F

In [2]:
def sparkSession(name_app: str, core_limit: int = None) -> SparkSession:
    
    core = '*' if not core_limit else core_limit
    
    spark = (
        SparkSession
        .builder
        .master(f'local[{core}]')
        .appName(name_app)
        .getOrCreate()
    )
    return spark

In [3]:
def read_files(file_name: str) -> list:
    import glob
    
    files = glob.glob(f"{file_name}/*.json")
    return files
    

In [4]:
def read_files_json_spark(spark_session: SparkSession, path_file: list | str) -> list[DataFrame]:
    list_spark_frames: list = []

    path_files = [path_file] if not isinstance(path_file, list) else path_file

    for path in path_files:
        frame = spark_session.read.option("multiline","true").json(path)
        list_spark_frames.append(frame.select("name", "packages"))

    return list_spark_frames


In [5]:
def select_explode_columns_spark_frame(dataframe: DataFrame) -> DataFrame:
    
    select_explode_dataframe = (
            dataframe
            .select(
                F.col("name").alias("new_name"),
                F.explode(F.col("packages")).alias("pacotes")
            )
            .select("new_name", "pacotes.*")
        )
    return select_explode_dataframe, select_explode_dataframe.columns

In [6]:
def create_select_columns_spark_frame(dataframe: DataFrame, cols: list) -> DataFrame:
    COLS = ["name", "versionInfo", "licenseConcluded"]
    
    for col in COLS:
        new_name_col = (
                    "new_biblioteca" if col == COLS[0]
            else "versao_biblioteca" if col == COLS[1]
            else "licenca_biblioteca"
        )
        if col not in cols:
            dataframe = dataframe.withColumn(new_name_col, F.lit(None))
            continue
        dataframe = dataframe.withColumnRenamed(col, new_name_col)

    return dataframe

In [7]:
def union_multiples_spark_frame(list_spark_frames: list) -> DataFrame:

    if len(list_spark_frames) == 1:
        return list_spark_frames[0]
    
    final_frame = list_spark_frames[0]
    for frame in list_spark_frames[1:]:
        final_frame = final_frame.union(frame).distinct()
    return final_frame      

In [8]:
def transform_spark_frame(list_spark_frames: list) -> DataFrame:
    
    result_frames = []
    for spark_frame in list_spark_frames:
        dataframe, cols = select_explode_columns_spark_frame(spark_frame)
        dataframe = create_select_columns_spark_frame(dataframe, cols)

        dataframe = dataframe.select(
                        F.col("new_name")
                        , F.col("new_biblioteca")
                        , F.col("versao_biblioteca")
                        , F.col("licenca_biblioteca")
                    )
        
        frame = dataframe.filter(dataframe.new_name != dataframe.new_biblioteca)
        result_frames.append(frame)
    final_dataframe = union_multiples_spark_frame(result_frames)
    
    final_dataframe = final_dataframe.withColumn("nome_repositorio", F.split(F.col("new_name"), '/')[1])
    final_dataframe = final_dataframe.withColumn("gerenciador_biblioteca", F.split(F.col("new_biblioteca"), ':')[0])
    final_dataframe = final_dataframe.withColumn("nome_biblioteca", F.split(F.col("new_biblioteca"), ':')[1])

    final_dataframe = (
            final_dataframe.
            select(*[col for col in final_dataframe.columns if col not in ('new_name','new_biblioteca')])
        )
    
    return final_dataframe


In [9]:
spark_session = sparkSession(name_app='Sbom', core_limit=2)
spark_session.sparkContext.setLogLevel("ERROR") 


## REMOVE O WARNING QUE É APRESENTADO QUANDO INICIALIZA A SESSÃO DO Spark

# 24/06/22 04:04:51 WARN Utils: Your hostname, edsojor resolves to a loopback address: 127.0.1.1; using 192.168.15.6 instead (on interface wlo1)
# 24/06/22 04:04:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
# Setting default log level to "WARN".
# To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
# 24/06/22 04:04:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

24/06/26 10:14:19 WARN Utils: Your hostname, edsojor resolves to a loopback address: 127.0.1.1; using 192.168.15.6 instead (on interface wlo1)
24/06/26 10:14:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/26 10:14:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
spark_session.sparkContext.setLogLevel("ERROR") 

In [11]:
# path ="data/sbom_27082024_040547.json"
paths = read_files("data")

In [12]:
list_spark_dataframes = read_files_json_spark(spark_session, paths)

In [26]:
spark_dataframe = transform_spark_frame(list_spark_dataframes)
spark_dataframe.show(truncate=False, n=5)
teste = spark_dataframe.limit(5)
teste.show(truncate=False)

+-----------------+------------------+-------------------------------------+----------------------+-----------------------+
|versao_biblioteca|licenca_biblioteca|nome_repositorio                     |gerenciador_biblioteca|nome_biblioteca        |
+-----------------+------------------+-------------------------------------+----------------------+-----------------------+
|1.0.3            |MIT               |data-prepper                         |npm                   |has                    |
|0.2.6            |MIT               |data-prepper                         |npm                   |bs-logger              |
|29.7.0           |MIT               |data-prepper                         |npm                   |jest-runner            |
|1.0.0            |MIT               |opensearch-dashboards-functional-test|npm                   |prettier-linter-helpers|
|2.2.3            |MIT               |data-prepper                         |npm                   |json5                  |
+-------

In [14]:
spark_dataframe.printSchema()

root
 |-- versao_biblioteca: string (nullable = true)
 |-- licenca_biblioteca: string (nullable = true)
 |-- nome_repositorio: string (nullable = true)
 |-- gerenciador_biblioteca: string (nullable = true)
 |-- nome_biblioteca: string (nullable = true)



In [15]:
spark_dataframe.count()

3064

In [20]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark_session.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark_session.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



                                                                                

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



In [22]:
# Syntax
# join(self, other, on=None, how=None)

result_left_anti = empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id, how='left_anti')
result_left_anti.show(truncate=False)

                                                                                

+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+



In [None]:
def frame_insert(left_frame, right_frame, how_:str='left_anti'):
    result = left_frame.join(right_frame, on='id_check', how=how_)
    return result

In [45]:
from pyspark.sql import Window
df = spark_session.range(3)
w = Window.orderBy(df['id'].asc())
teste = df.withColumn("desc_order", F.row_number().over(w)+1)
teste.show()

+---+----------+
| id|desc_order|
+---+----------+
|  0|         2|
|  1|         3|
|  2|         4|
+---+----------+



In [53]:
def soma(a, b, c):
    print(f'{a=}')
    print(f'{b=}')
    print(f'{c=}')
    
    return a + b

soma(b=3, c=1, a=0)

a=0
b=3
c=1


3

In [54]:
from pyspark.sql.utils import AnalysisException

In [16]:
# desligar/parar a sessão spark 
# spark_session.stop()

In [17]:
# import sys
# from awsglue.transforms import *
# from awsglue.utils import getResolvedOptions
# from pyspark.context import SparkContext
# from awsglue.context import GlueContext
# from awsglue.job import Job
  
# sc = SparkContext.getOrCreate()
# glueContext = GlueContext(sc)
# spark = glueContext.spark_session
# job = Job(glueContext)


# s3output = glueContext.getSink(
#   path="s3://bucket_name/folder_name",
#   connection_type="s3",
#   updateBehavior="UPDATE_IN_DATABASE",
#   partitionKeys=[],
#   compression="snappy",
#   enableUpdateCatalog=True,
#   transformation_ctx="s3output",
# )
# s3output.setCatalogInfo(
#   catalogDatabase="demo", catalogTableName="populations"
# )
# s3output.setFormat("glueparquet")
# s3output.writeFrame(DyF)

## Existem três principais maneiras de desenvolver trabalhos do AWS Glue localmente:

1. **AWS Glue ETL Sdk**: Permite que os desenvolvedores escrevam scripts usando a API do Python, disponibilizando uma experiência de desenvolvimento familiar e flexível.

2. **AWS Glue PySpark**: Utiliza o PySpark, que é a API Python para o Apache Spark, permitindo que os desenvolvedores criem trabalhos complexos de transformação de dados usando o Spark.

3. **AWS Glue DataBrew**: Oferece uma interface visual que permite aos usuários criar e modificar transformações de dados sem a necessidade de escrever código.

Essas três maneiras oferecem opções para diferentes preferências e necessidades de desenvolvimento.
