In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[4]") \
.appName("Csv-Üzeri-SQL") \
.config("spark.executor.memory","4g") \
.config("spark.driver.memory","2g") \
.getOrCreate()

In [4]:
df = spark.read \
.option("header","True") \
.option("inferSchema","True") \
.option("sep",";") \
.csv("sources\\OnlineRetail.csv")

df.show(5)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     2,55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     2,75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows



# Changing type of column withColumn directly #

In [22]:
df2 = df.withColumn("Quantity",df.Quantity.cast(StringType()))
df2.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [5]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [6]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType

manual_schema = StructType(
[
    StructField("InvoiceNo", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", FloatType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Country", StringType(), True)
]
)

## Rereading the data with manuel schema ##

In [9]:
df2 = spark.read \
.option("header","True") \
.schema(manual_schema) \
.option("sep",";") \
.csv("sources\\OnlineRetail.csv")

df2.show(5)


+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     null|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     null|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     null|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|1.12.2010 08:26|     null|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|1.12.2010 08:26|     null|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows



unit price should be replaced with '.' for ','

In [11]:
from pyspark.sql import functions as F

df = spark.read \
.option("header","True") \
.option("inferSchema","True") \
.option("sep",";") \
.csv("sources\\OnlineRetail.csv") \
.withColumn("UnitPrice",F.regexp_replace(F.col("UnitPrice"), ",","."))

df.show(5)


+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|1.12.2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|1.12.2010 08:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows



In [12]:
##write the replaced dataframe to the disk
df \
    .coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("sep",";") \
    .option("header","true") \
    .csv("sources\\OnlineRetail2")

In [14]:
df2 = spark.read \
.option("header","True") \
.schema(manual_schema) \
.option("sep",";") \
.csv("sources\\OnlineRetail2")

df2.show(5)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|1.12.2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|1.12.2010 08:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows



In [15]:
df2.printSchema()


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

