In [0]:
from pyspark.sql.functions import col, udf, lit, count

In [0]:
# Limpando o max_id
dbutils.fs.rm('dbfs:/FileStore/max_id/', True)

In [0]:
# Criando dados aleatórios
data = [
  ["john", "company 1", 12345],
  ["peter", "company 2", 23423],
  ["bobby", "company 3", None],
  ["tony", "company 2", 77464],
  ["steve", "company 1", 80804],
  ["anna", None, 32432]
]

columns = ['Employee_Name', 'Company_Name', 'Password']

dataframe = spark.createDataFrame(data, columns)
display(dataframe)

Employee_Name,Company_Name,Password
john,company 1,12345.0
peter,company 2,23423.0
bobby,company 3,
tony,company 2,77464.0
steve,company 1,80804.0
anna,,32432.0


In [0]:
dataframe.createOrReplaceTempView("tmp_vw_dataframe")

add_row_id = spark.sql("""
  select 
    row_number() over(order by Employee_Name) AS id,
    *
  from tmp_vw_dataframe
""")

display(add_row_id)

id,Employee_Name,Company_Name,Password
1,anna,,32432.0
2,bobby,company 3,
3,john,company 1,12345.0
4,peter,company 2,23423.0
5,steve,company 1,80804.0
6,tony,company 2,77464.0


In [0]:
# Verificando se existe o arquivo que guarda o valor máximo
try:
  path_to_max_id = dbutils.fs.ls('dbfs:/FileStore/max_id/')
except:
  path_to_max_id = []

In [0]:
if path_to_max_id == []:
  
  # Se não cosneguiu ler o arquivo, ele não existe
  # Então é verificdo o max id da primeira carga
  initial_max_id = add_row_id.groupby().max('id').collect()[0]["max(id)"]
  print(f"initial_max_id: {initial_max_id}")
  
  # E guarda no arquivo do max id
  new_file = spark.createDataFrame([[initial_max_id]], ["max_id"])
  
  # Salva file max id
  new_file.write.mode('overwrite').format('parquet').save('dbfs:/FileStore/max_id/')
  
  # Salva dataframe
else:
  # Tenta ler o arquivo com o max id
  json_file = spark.read.format('parquet').load('dbfs:/FileStore/max_id/')

  # Se existir, verifica qual
  old_max_id = json_file.collect()[0]["max_id"]

  print(f"old_max_id: {old_max_id}")
  
  # Adiciona o max id
  final_df_data = add_row_id.withColumn('id', col('id') + old_max_id)
  
  # Verifica o novo max id
  new_max_id = final_df_data.groupby().max('id').collect()[0]["max(id)"]
  print(f"new_max_id: {new_max_id}")
  
  # Salva o novo max id no file
  new_file = json_file.withColumn("max_id", lit(new_max_id))
  
  # Salva file max id
  new_file.write.mode('overwrite').format('parquet').save('dbfs:/FileStore/max_id/')
  
  # Salva dataframe