### Połączenie

In [5]:
from pyspark.sql import SparkSession, Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.pandas as ps

try:
    # Tworzenie sesji Spark
    spark = SparkSession.builder \
        .appName("PySpark SQL Server Connection") \
        .config("spark.jars", "mssql-jdbc-12.6.1.jre8.jar") \
        .getOrCreate()

    # Parametry połączenia z bazą danych MSSQL
    server_name = "localhost"
    port = "1433"
    database_name = "NowaBazaDanych2"
    url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"

    table_name = "Pracownicy15"
    username = "sa"
    password = "YourStrongPassword123"

    # Wczytanie danych z bazy danych MSSQL
    df = spark.read \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password) \
        .option("encrypt", "false") \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .load()

    print("Dane zostały pomyślnie wczytane z MSSQL.")
    # Wyświetlenie pierwszych kilku wierszy DataFrame
    df.show()

except Exception as e:
    print("Wystąpił błąd podczas łączenia z bazą danych:", str(e))



Wystąpił błąd podczas łączenia z bazą danych: An error occurred while calling o69.load.
: com.microsoft.sqlserver.jdbc.SQLServerException: The TCP/IP connection to the host localhost, port 1433 has failed. Error: "Connection refused. Verify the connection properties. Make sure that an instance of SQL Server is running on the host and accepting TCP/IP connections at the port. Make sure that TCP connections to the port are not blocked by a firewall.".
	at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:233)
	at com.microsoft.sqlserver.jdbc.SQLServerException.convertConnectExceptionToSQLServerException(SQLServerException.java:284)
	at com.microsoft.sqlserver.jdbc.SocketFinder.findSocket(IOBuffer.java:2589)
	at com.microsoft.sqlserver.jdbc.TDSChannel.open(IOBuffer.java:721)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectHelper(SQLServerConnection.java:3763)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.login(SQLServerConnectio

### ETL 1 - Przygotowanie dat do późniejszej analizy

In [670]:
df.select('Date Rptd').show(10)

+--------------------+
|           Date Rptd|
+--------------------+
|01/08/2020 12:00:...|
|04/22/2020 12:00:...|
|01/27/2020 12:00:...|
|01/24/2020 12:00:...|
|01/20/2020 12:00:...|
|01/24/2020 12:00:...|
|01/05/2020 12:00:...|
|09/06/2020 12:00:...|
|10/10/2020 12:00:...|
|06/26/2020 12:00:...|
+--------------------+
only showing top 10 rows



In [671]:
from pyspark.sql.functions import split, col

df = df.withColumn("Date Rptd", split(col("Date Rptd"), " ").getItem(0))
df = df.withColumn("DATE OCC", split(col("DATE OCC"), " ").getItem(0))

In [672]:
df.select('Date Rptd').show()

+----------+
| Date Rptd|
+----------+
|01/08/2020|
|04/22/2020|
|01/27/2020|
|01/24/2020|
|01/20/2020|
|01/24/2020|
|01/05/2020|
|09/06/2020|
|10/10/2020|
|06/26/2020|
|11/12/2020|
|09/22/2020|
|01/22/2020|
|03/16/2022|
|03/20/2020|
|06/24/2020|
|09/04/2020|
|02/15/2020|
|01/14/2020|
|02/10/2020|
+----------+
only showing top 20 rows



In [673]:
# Konwersja daty do formatu DateType
df = df.withColumn("data", to_date("Date Rptd", "MM/dd/yyyy"))
df = df.withColumn("data", to_date("DATE OCC", "MM/dd/yyyy"))

# Formatowanie daty na "dzień miesiąc rok" po pauzach
df = df.withColumn("Date Rptd", date_format("data", "dd-MM-yyyy"))
df = df.withColumn("DATE OCC", date_format("data", "dd-MM-yyyy"))

# Wyświetlenie wyniku
df.show(truncate=False)

+---------+----------+----------+--------+----+-----------+-----------+--------+------+-------------------------------------------------------+------------------------+--------+--------+------------+---------+--------------------------------------------------+--------------+----------------------------------------------+------+------------+--------+--------+--------+--------+----------------------------------------+-------------------------------+-------+---------+----------+
|DR_NO    |Date Rptd |DATE OCC  |TIME OCC|AREA|AREA NAME  |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc                                            |Mocodes                 |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc                                       |Weapon Used Cd|Weapon Desc                                   |Status|Status Desc |Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|LOCATION                                |Cross Street                   |LAT    |LON      |data      |
+---------+----------+----------+-----

In [674]:
df.show(10)

+---------+----------+----------+--------+----+-----------+-----------+--------+------+--------------------+-------------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+-----------+--------+--------+--------+--------+--------------------+--------------------+-------+---------+----------+
|    DR_NO| Date Rptd|  DATE OCC|TIME OCC|AREA|  AREA NAME|Rpt Dist No|Part 1-2|Crm Cd|         Crm Cd Desc|            Mocodes|Vict Age|Vict Sex|Vict Descent|Premis Cd|         Premis Desc|Weapon Used Cd|         Weapon Desc|Status|Status Desc|Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|            LOCATION|        Cross Street|    LAT|      LON|      data|
+---------+----------+----------+--------+----+-----------+-----------+--------+------+--------------------+-------------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+-----------+--------+--------+--------+--------+-------------

In [675]:
# Wyświetlenie początkowych danych
print("Przed zastąpieniem:")
df.select('Vict Sex').distinct().show()

Przed zastąpieniem:
+--------+
|Vict Sex|
+--------+
|       F|
|       M|
|       X|
|       H|
|    NULL|
+--------+



In [676]:
count_H = df.filter(df['Vict Sex'].isNotNull()).count()
print(count_H)
count_H = df.filter(df['Vict Sex'] == 'H').count()
print(count_H)

65359
12


In [677]:
outed_data = (df.filter((df['Vict Sex'] == 'H')))
df = df.filter((df['Vict Sex'] != 'H'))

# Aktualizacja wartości w kolumnie 'Vict Sex'
df = df.withColumn("Vict Sex", 
                   when(df["Vict Sex"] == "F", "Female")
                   .when(df["Vict Sex"] == "M", "Male")
                   .when(df["Vict Sex"] == "X", "X-gender")
                   .when(df["Vict Sex"].isNull(), "no data")
                   .otherwise(df["Vict Sex"]))

df.printSchema()

root
 |-- DR_NO: long (nullable = true)
 |-- Date Rptd: string (nullable = true)
 |-- DATE OCC: string (nullable = true)
 |-- TIME OCC: long (nullable = true)
 |-- AREA: long (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: long (nullable = true)
 |-- Part 1-2: long (nullable = true)
 |-- Crm Cd: long (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: long (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: double (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: double (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: double (nullable = true)
 |-- Crm Cd 2: double (nullable = true)
 |-- Crm Cd 3: double (nullable = true)
 |-- Crm Cd 4: double (nullable = true)
 |-- LOCATION: string (nullable =

In [678]:
df.select('Vict Age')

DataFrame[Vict Age: bigint]

In [679]:
df.select('Vict Descent').distinct().show()

+------------+
|Vict Descent|
+------------+
|           K|
|           F|
|           B|
|           L|
|           V|
|           U|
|           O|
|           D|
|           C|
|           J|
|           Z|
|           A|
|           X|
|           W|
|           S|
|           G|
|           I|
|           P|
|           H|
|        NULL|
+------------+



In [680]:
counts_df = df.groupBy("Vict Descent").count()
counts_df.show()

+------------+-----+
|Vict Descent|count|
+------------+-----+
|           K|  373|
|           F|  298|
|        NULL|    1|
|           B|10804|
|           L|    2|
|           V|   65|
|           U|    9|
|           O| 5888|
|           D|   10|
|           C|  257|
|           J|  114|
|           Z|   44|
|           A| 1648|
|           X| 7152|
|           W|15423|
|           S|    4|
|           G|    2|
|           I|   69|
|           P|   16|
|           H|23168|
+------------+-----+



In [681]:
df_gb = df.groupBy("Vict Descent").count()

df_gb = df_gb.orderBy('count', ascending=False)

start_row = 6

df_gb = df_gb.withColumn("row_index", monotonically_increasing_id())

# Przypisanie elementów od 6. wiersza
others = df_gb.filter(df_gb["row_index"] >= start_row)

top_five = df_gb.filter(df_gb['row_index'] < start_row)

In [682]:
top_five.show()

+------------+-----+---------+
|Vict Descent|count|row_index|
+------------+-----+---------+
|           H|23168|        0|
|           W|15423|        1|
|           B|10804|        2|
|           X| 7152|        3|
|           O| 5888|        4|
|           A| 1648|        5|
+------------+-----+---------+



In [683]:
suma = others.agg(sum("count")).collect()[0][0]
suma

data = [('Other', suma, 6)]

# Tworzenie ramki danych
df_sum = spark.createDataFrame(data, ["Vict Descent", "count", 'row_index'])

df_sum.show()

+------------+-----+---------+
|Vict Descent|count|row_index|
+------------+-----+---------+
|       Other| 1264|        6|
+------------+-----+---------+



In [684]:
combined_df = df_sum.union(top_five)
combined_df.show()

+------------+-----+---------+
|Vict Descent|count|row_index|
+------------+-----+---------+
|       Other| 1264|        6|
|           H|23168|        0|
|           W|15423|        1|
|           B|10804|        2|
|           X| 7152|        3|
|           O| 5888|        4|
|           A| 1648|        5|
+------------+-----+---------+



In [685]:
top_five = top_five.filter(top_five["Vict Descent"] != 'O')
top_five.show()

+------------+-----+---------+
|Vict Descent|count|row_index|
+------------+-----+---------+
|           H|23168|        0|
|           W|15423|        1|
|           B|10804|        2|
|           X| 7152|        3|
|           A| 1648|        5|
+------------+-----+---------+



In [686]:
tfv = top_five.select('Vict Descent')
tfv.show()

+------------+
|Vict Descent|
+------------+
|           H|
|           W|
|           B|
|           X|
|           A|
+------------+



In [687]:
from pyspark.sql.functions import when

# Utworzenie warunku
top_five_values = top_five.select('Vict Descent').distinct().collect()
top_five_values = [row['Vict Descent'] for row in top_five_values]
condition = ~df['Vict Descent'].isin(top_five_values)

# Tworzenie nowej kolumny na podstawie warunku
df = df.withColumn('Vict Descent', when(condition, 'Other').otherwise(df['Vict Descent']))

print(top_five_values)

['H', 'W', 'B', 'X', 'A']


In [688]:
df = df.withColumn("Vict Descent", 
                   when(df["Vict Descent"] == "H", "Hispanic or Latino")
                   .when(df["Vict Descent"] == "W", "White")
                   .when(df["Vict Descent"] == "B", "Black")
                   .when(df["Vict Descent"] == "X", "X-gender")
                   .when(df["Vict Descent"] == "O", "X-gender")
                   .when(df["Vict Descent"] == "A", "Asian")
                   .otherwise(df["Vict Descent"]))

In [689]:
df.select('Vict Age').distinct().show(200)

+--------+
|Vict Age|
+--------+
|      26|
|      29|
|      65|
|      54|
|      19|
|       0|
|      22|
|       7|
|      77|
|      34|
|      50|
|      94|
|      57|
|      43|
|      32|
|      84|
|      31|
|      39|
|      98|
|      25|
|      95|
|      71|
|      68|
|       6|
|      72|
|      87|
|      58|
|       9|
|      27|
|      63|
|      51|
|      56|
|      52|
|      17|
|      79|
|      41|
|      33|
|      28|
|      88|
|       5|
|      96|
|      -2|
|      10|
|      89|
|      85|
|      48|
|      67|
|      44|
|      61|
|       3|
|      37|
|      83|
|      12|
|      55|
|      74|
|       8|
|      62|
|      49|
|      11|
|      35|
|      80|
|       2|
|      66|
|      76|
|       4|
|      92|
|      13|
|      36|
|      75|
|      78|
|      18|
|      69|
|      14|
|      21|
|      59|
|      15|
|      81|
|      38|
|      82|
|      97|
|      30|
|      42|
|      73|
|      90|
|      23|
|      46|
|      70|
|      20|

In [690]:
from pyspark.ml.feature import Bucketizer

df = df.filter(df['Vict Age'] >= 0)
bucketizer = Bucketizer(splits=[0, 18, 35, 60,120, float('Inf') ],inputCol="Vict Age", outputCol="Vict Age bucket")
df = bucketizer.setHandleInvalid("keep").transform(df)

# Aktualizacja wartości w kolumnie 'Vict Age bucket'
df = df.withColumn("Vict Age bucket", 
                   when(df["Vict Age bucket"] == 0.0, "0-18")
                   .when(df["Vict Age bucket"] == 1.0, "18-35")
                   .when(df["Vict Age bucket"] == 2.0, "35-60")
                   .when(df["Vict Age bucket"] == 3.0, "60-120")
                   .otherwise("Unknown"))

df.select('Vict Age bucket').show()

+---------------+
|Vict Age bucket|
+---------------+
|          35-60|
|          35-60|
|           0-18|
|          18-35|
|          35-60|
|          18-35|
|          35-60|
|          35-60|
|          35-60|
|           0-18|
|          35-60|
|          35-60|
|         60-120|
|          18-35|
|          35-60|
|          35-60|
|           0-18|
|         60-120|
|          18-35|
|          35-60|
+---------------+
only showing top 20 rows



In [691]:
server_name = "localhost"
port = "1433"
database_name = "After_ETL"
url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"

table_name = "Clean_table1"
username = "sa"
password = "YourStrongPassword123"

df.write \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .option("encrypt", "false") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .mode("append") \
    .save()

In [692]:
# Zakończenie sesji Spark
#spark.stop()