In [None]:
from pyspark.sql import SparkSession
import random
from datetime import datetime, timedelta
import os
import string
from pyspark.sql.functions import when, col
from pyspark.sql.types import StringType

# инициализация sparksession
spark=SparkSession.builder \
.appName("Generate CSV") \
.getOrCreate()

# Ввод количества записей
num_rows = int(input('Введите количество записей, необходимых для генерации: '))

#словари с именами и городами
names = ["Aleksandr", "Sasha", "Oleg", "Ivan", "Sergey", "Mikhail", "Nastya", "Eleanor", "Frederick", "Gabrielle",
         "Harrison", "Isabelle", "Artem", "Maksim", "Igor", "Maria", "Elena", "Marina"]
cities = ["Samara", "Peterburg", "Chicago", "Astrakhan", "Saratov",
          "Moscow", "Vladivostok", "SanDiego", "Khabarovsk", "Krasnoyarsk", "Rostov", "Krasnodar", "Ekaterinburg"]


# Генерация случайного имени с минимальной длиной 5 символов
# def generate_random_name():
#     return ''.join(random.choices(string.ascii_lowercase, k=random.randint(5, 10)))

# Генерация случайного города с минимальной длиной 7 символов
# def generate_random_city():
#     return ''.join(random.choices(string.ascii_lowercase, k=random.randint(7, 12)))

def generate_random_registration_date():
    end_date = datetime.now()
    start_date = end_date - timedelta(days=5*365)
    random_date = start_date + (end_date - start_date) * random.random()
    return random_date.strftime("%Y-%m-%d")

# функция выбора случайного имени из словаря (списка)
def random_name():
  return random.choice([name for name in names if len(name) >= 5])

#  функция выбора случайного города из словаря (списка)
def random_city():
  return random.choice([city for city in cities if len(city) >= 7])

# функция для генерации данных
def data_gen(num_rows):
  data=[]
  for i in range(num_rows):
        n=i+1
        name = random_name()
        city = random_city()
        registration_date = generate_random_registration_date()
        row = (i+1, name, f"{name}_{n}@somemail.com", city, random.randint(18, 95),
               random.randint(300, 2000)*100, registration_date)
        data.append(row)
  return data

# функция генерации случайного количества строк , которое будет содержать значения null
def num_string(num_rows):
   num_nulls_temp = num_rows * random.randint(1 , 5)/100
   return num_nulls_temp

# функция проверки укладывается ли количесвто null строк в 5 %
def num_string_check(num_nulls_temp):
  if num_nulls_temp <= num_rows/20:
     num_nulls = int(num_nulls_temp)
  else:
     num_nulls = 0
  return num_nulls

# функция выбора случайных id-шников строк, которые будет содержать значения null
def null_indices_get(id_list, num_nulls):
  null_indices = random.sample(id_list, num_nulls)
  return null_indices

# генерируем данные с помощью ранее написанной функции
data=data_gen(num_rows)

# создаем датафрейм
columns = ['id', 'name', 'email', 'city', 'age', 'salary', 'registration_date']
df = spark.createDataFrame(data, schema=columns)

# создаем список значений id
id_list=[]
for i in range(num_rows):
  id_list.append(data[i][0])

# примененние null к случайным строкам в колонке city (не более 5% строк)
num_nulls_temp = num_string(num_rows)
num_nulls = num_string_check(num_nulls_temp)
null_indices = null_indices_get(id_list, num_nulls)
# print(null_indices)    # выводим список индексов с null
df=df.withColumn('city', when(col('id').isin(null_indices),None).otherwise(col('city')))

# примененние null к случайным строкам в колонке email (не более 5% строк)
num_nulls_temp = num_string(num_rows)
num_nulls = num_string_check(num_nulls_temp)
null_indices = null_indices_get(id_list, num_nulls)
# print(null_indices)    # выводим список индексов с null
df=df.withColumn('email', when(col('id').isin(null_indices),None).otherwise(col('email')))

# примененние null к случайным строкам в колонке registration_date (не более 5% строк)
num_nulls_temp = num_string(num_rows)
num_nulls = num_string_check(num_nulls_temp)
null_indices = null_indices_get(id_list, num_nulls)
# print(null_indices)    # выводим список индексов с null
df=df.withColumn('registration_date', when(col('id').isin(null_indices),None).otherwise(col('registration_date')))

# получаем текущую дату и время
cur_date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

# сохряняем датафрейм в единый файл
output_path = f"{cur_date}-dev.csv"
df.coalesce(1).write.mode("overwrite").csv(output_path, header=True)

# переименовываем csv файл
csv_file = [f for f in os.listdir(output_path) if f.endswith('.csv')]
old_file_name = csv_file[0]
old_file_path = os.path.join(output_path, old_file_name)
new_file_name = f"{cur_date}-dev.csv"
new_file_path = os.path.join(output_path, new_file_name)

os.rename(old_file_path, new_file_path)

print(f'CSV файл сохранен по пути: {output_path}')

spark.stop()