In [1]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('data_quality')\
    .config("spark.master", 'local[4]')\
    .config("spark.shuffle.sql.partitions", 1)\
    .getOrCreate()

In [5]:
spark

In [6]:
cols = ['id', 'name', 'x']
l = [
    (1, 'A', 'a'),
    (2, 'B', 'b'),
    (3, 'C', 'c'),
    (4, 'D', 'd'),
]

In [7]:
df = spark.createDataFrame(l, schema=cols)

In [8]:
df.count()

4

In [9]:
df.show()

+---+----+---+
| id|name|  x|
+---+----+---+
|  1|   A|  a|
|  2|   B|  b|
|  3|   C|  c|
|  4|   D|  d|
+---+----+---+



In [10]:
df = spark.read.csv('../landing_zone/persistent/acled_20211224.csv', sep=',', header=True)

In [11]:
df.count()

1232494

In [14]:
# df.show()
df.printSchema()

root
 |-- data_id: string (nullable = true)
 |-- iso: string (nullable = true)
 |-- event_id_cnty: string (nullable = true)
 |-- event_id_no_cnty: string (nullable = true)
 |-- event_date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- time_precision: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sub_event_type: string (nullable = true)
 |-- actor1: string (nullable = true)
 |-- assoc_actor_1: string (nullable = true)
 |-- inter1: string (nullable = true)
 |-- actor2: string (nullable = true)
 |-- assoc_actor_2: string (nullable = true)
 |-- inter2: string (nullable = true)
 |-- interaction: string (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- admin1: string (nullable = true)
 |-- admin2: string (nullable = true)
 |-- admin3: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- geo_precision:

In [17]:
df.select('iso3').distinct().show()

+----+
|iso3|
+----+
| PSE|
| HTI|
| POL|
| LVA|
| BRB|
| JAM|
| ZMB|
| BRA|
| ARM|
| MOZ|
| CUB|
| JOR|
| SOM|
| FRA|
| ABW|
| TCA|
| COD|
| BRN|
| BOL|
| URY|
+----+
only showing top 20 rows



In [18]:
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+-------+---+-------------+----------------+----------+----+--------------+----------+--------------+------+-------------+------+------+-------------+------+-----------+------+-------+------+------+------+--------+--------+---------+-------------+------+------------+-----+----------+---------+----+
|data_id|iso|event_id_cnty|event_id_no_cnty|event_date|year|time_precision|event_type|sub_event_type|actor1|assoc_actor_1|inter1|actor2|assoc_actor_2|inter2|interaction|region|country|admin1|admin2|admin3|location|latitude|longitude|geo_precision|source|source_scale|notes|fatalities|timestamp|iso3|
+-------+---+-------------+----------------+----------+----+--------------+----------+--------------+------+-------------+------+------+-------------+------+-----------+------+-------+------+------+------+--------+--------+---------+-------------+------+------------+-----+----------+---------+----+
|      0|  0|            0|               0|         0|   0|             0|         0|             0

In [20]:
import pandas as pd
dg = pd.read_csv('../landing_zone/persistent/acled_20211224.csv')

In [21]:
dg.isna().sum()

data_id                   0
iso                       0
event_id_cnty             0
event_id_no_cnty          0
event_date                0
year                      0
time_precision            0
event_type                0
sub_event_type            0
actor1                    0
assoc_actor_1        765942
inter1                    0
actor2               602015
assoc_actor_2       1073485
inter2                    0
interaction               0
region                    0
country                   0
admin1                   84
admin2                48240
admin3               643975
location                  0
latitude                  0
longitude                 0
geo_precision             0
source                    0
source_scale              0
notes                    19
fatalities                0
timestamp                 0
iso3                      0
dtype: int64

In [24]:
import pyspark.sql.functions as funcs

df.groupBy(df.columns)\
    .count()\
    .where(funcs.col('count') > 1)\
    .select(funcs.sum('count'))\
    .show()

+----------+
|sum(count)|
+----------+
|      null|
+----------+



In [22]:
dg.duplicated().sum()

0