In [0]:
import pyspark.sql.functions as F
import os

STORAGE_ACCOUNT = os.getenv('STORAGE_ACCOUNT')
STORAGE_ACCOUNT_KEY = os.getenv('STORAGE_ACCOUNT_KEY')
spark.conf.set(STORAGE_ACCOUNT, STORAGE_ACCOUNT_KEY)


In [0]:
turkey_df = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .csv("abfss://datasets@tfmstorageacc.dfs.core.windows.net/violence_against _women_turkey.csv")

turkey_df.display()

For this dataset we need to get the year from the date column and group perpetrators and age group


In [0]:
turkey_df = turkey_df.withColumn(
    'year',
    F.year(F.to_date(F.col('Date'), 'dd/MM/yyyy')),
)
turkey_df.select(
    'Date',
    'year',
).display()

there are some wrong formatted dates in the dataset. Since errors and typings are so varied its too difficult to clean them up.
We will drop them

In [0]:
turkey_df = turkey_df.where(
    F.col('year').isNotNull(),
)

now group age groups

In [0]:
turkey_df.select(
    'Age of Victim'
).distinct().display()

In [0]:
turkey_df_grouped_age = turkey_df.withColumn(
    'age_group',
    F.when(
        F.col("Age of Victim").cast("int").isNull(),
        F.col("Age of Victim"),
    ).otherwise(
        F.when(
            F.col("Age of Victim").cast("int") < 18,
            'child'
        ).otherwise(
            'adult'
        )
    )
)
turkey_df_grouped_age.select('age_group').distinct().display()

In [0]:
turkey_df_grouped_age = turkey_df_grouped_age.where(
    F.col('age_group').isin(['child', 'adult', 'of age', 'Not Rashid'])
).withColumn(
    'age_group',
    F.when(
        F.col("age_group") == 'of age',
        'adult',
    ).otherwise(
        F.when(
            F.col("age_group") == 'Not Rashid',
            'child',
        ).otherwise(F.col('age_group'))
    )
)
turkey_df_grouped_age.select('age_group').distinct().display()

In [0]:
turkey_df_grouped_age.count()

There is too little data now, so we will bin the age groups into just 'Any'. Now group perpetrators

In [0]:
turkey_df_aggregated = turkey_df.withColumn(
    'country', F.lit('turkiye'), # this is the new name of the country
).withColumn(
    'age_group', F.lit('any'),
).select(
    F.col('year'),
    F.col('country'),
    F.col('Province').alias('province'),
).groupBy('year', 'country', 'province').agg(
    F.count('*').alias('total_cases'),
)
turkey_df_aggregated.display()


and now group for all provinces

In [0]:
turkey_df_all = turkey_df_aggregated.withColumn(
    'Province', F.lit('all'),
).select(
    F.col('year'),
    F.col('country'),
    F.col('Province').alias('province'),
    F.col('total_cases'),
).groupBy('year', 'country', 'province').agg(
    F.sum('total_cases').alias('total_cases'),
)
turkey_df_aggregated.union(turkey_df_all).display()