In [0]:
def file_exists(path):
  try:
    dbutils.fs.ls(path)
    return True 
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise

In [0]:
actorsUrl = "https://raw.githubusercontent.com/cegladanych/azure_bi_data/main/IMDB_movies/actors.csv"
filePath = "/FileStore/tables/Files/"
dbutils.fs.mkdirs(filePath)
actorsFile = "actors.csv"
tmp = "file:/tmp/"
dbfsdestination = "dbfs:/FileStore/tables/Files/"

# Read DataFrame with Schema

In [0]:
import urllib.request

if (file_exists(filePath + actorsFile) == False):
  urllib.request.urlretrieve(actorsUrl,"/tmp/" + actorsFile)
  dbutils.fs.mv(tmp + actorsFile,dbfsdestination + actorsFile)

In [0]:
filePath = "dbfs:/FileStore/tables/Files/actors.csv"
actorsDf = spark.read.format("csv") \
            .option("header","true") \
            .option("inferSchema","true") \
            .load(filePath)

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

actors_schema = StructType([
    StructField("imdb_title_id", StringType(), False),
    StructField("ordering", IntegerType(), True),
    StructField("imb_name_id", StringType(), True),
    StructField("category", StringType(), True),
    StructField("job", StringType(), True),
    StructField("characters", StringType(), True),
])

In [0]:
actorsWithSchema = spark.read.format("csv") \
            .option("header","true") \
            .option("inferSchema","true") \
            .schema(actors_schema) \
            .load(filePath) 

actorsWithSchema.printSchema()

root
 |-- imdb_title_id: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- imb_name_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)



In [0]:
actorsWithSchema.show()

+-------------+--------+-----------+---------------+----------+--------------------+
|imdb_title_id|ordering|imb_name_id|       category|       job|          characters|
+-------------+--------+-----------+---------------+----------+--------------------+
|    tt0000009|       1|  nm0063086|        actress|      null|[Miss Geraldine H...|
|    tt0000009|       2|  nm0183823|          actor|      null|      [Mr. Hamilton]|
|    tt0000009|       3|  nm1309758|          actor|      null|[Chauncey Depew -...|
|    tt0000009|       4|  nm0085156|       director|      null|                null|
|    tt0000574|       1|  nm0846887|        actress|      null|        [Kate Kelly]|
|    tt0000574|       2|  nm0846894|          actor|      null|     [School Master]|
|    tt0000574|       3|  nm3002376|          actor|      null|        [Steve Hart]|
|    tt0000574|       4|  nm0170118|        actress|      null|                null|
|    tt0000574|       5|  nm0846879|       director|      null|  

# Read Modes

Wykorzystaj posiadane pliki bądź użyj nowe.  Użyj Sparka do odczytania jednego pliku i użyj wszystkich typów read modes. Poprawny plik nie wywoła żadnych efektów, więc popsuj dane tak aby każda z read modes zadziałał.

In [0]:
# Corrupt data
from pyspark.sql import SparkSession

df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(filePath)

from pyspark.sql.functions import when

df = df.withColumn("ordering", when(df["ordering"] == 4, "xyz"))

df.show()

+-------------+--------+------------+---------------+----------+--------------------+
|imdb_title_id|ordering|imdb_name_id|       category|       job|          characters|
+-------------+--------+------------+---------------+----------+--------------------+
|    tt0000009|    null|   nm0063086|        actress|      null|[Miss Geraldine H...|
|    tt0000009|    null|   nm0183823|          actor|      null|      [Mr. Hamilton]|
|    tt0000009|    null|   nm1309758|          actor|      null|[Chauncey Depew -...|
|    tt0000009|     xyz|   nm0085156|       director|      null|                null|
|    tt0000574|    null|   nm0846887|        actress|      null|        [Kate Kelly]|
|    tt0000574|    null|   nm0846894|          actor|      null|     [School Master]|
|    tt0000574|    null|   nm3002376|          actor|      null|        [Steve Hart]|
|    tt0000574|     xyz|   nm0170118|        actress|      null|                null|
|    tt0000574|    null|   nm0846879|       director| 

In [0]:
print(filePath)

dbfs:/FileStore/tables/Files/actors.csv


In [0]:
output_path = 'dbfs:/FileStore/tables/Files/actors_corrupted.csv'

df.write.mode("overwrite") \
    .option("header", "true") \
    .csv(output_path)

In [0]:
filePath_corrupted = "dbfs:/FileStore/tables/Files/actors_corrupted.csv"

actorsWithSchema = spark.read.format("csv") \
            .option("header","true") \
            .option("inferSchema","true") \
            .schema(actors_schema) \
            .option("mode", "DROPMALFORMED") \
            .load(filePath_corrupted) 

display(actorsWithSchema)

imdb_title_id,ordering,imb_name_id,category,job,characters
tt0000009,,nm0063086,actress,,[Miss Geraldine Holbrook (Miss Jerry)]
tt0000009,,nm0183823,actor,,[Mr. Hamilton]
tt0000009,,nm1309758,actor,,[Chauncey Depew - the Director of the New York Central Railroad]
tt0000574,,nm0846887,actress,,[Kate Kelly]
tt0000574,,nm0846894,actor,,[School Master]
tt0000574,,nm3002376,actor,,[Steve Hart]
tt0000574,,nm0846879,director,,
tt0000574,,nm0317210,producer,producer,
tt0000574,,nm0425854,producer,producer,
tt0000574,,nm0846911,producer,producer,


In [0]:
actorsWithSchema = spark.read.format("csv") \
            .option("header","true") \
            .option("inferSchema","true") \
            .schema(actors_schema) \
            .option("mode", "PERMISSIVE") \
            .load(filePath_corrupted) 

display(actorsWithSchema)

imdb_title_id,ordering,imb_name_id,category,job,characters
tt0000009,,nm0063086,actress,,[Miss Geraldine Holbrook (Miss Jerry)]
tt0000009,,nm0183823,actor,,[Mr. Hamilton]
tt0000009,,nm1309758,actor,,[Chauncey Depew - the Director of the New York Central Railroad]
tt0000009,,nm0085156,director,,
tt0000574,,nm0846887,actress,,[Kate Kelly]
tt0000574,,nm0846894,actor,,[School Master]
tt0000574,,nm3002376,actor,,[Steve Hart]
tt0000574,,nm0170118,actress,,
tt0000574,,nm0846879,director,,
tt0000574,,nm0317210,producer,producer,


In [0]:
actorsWithSchema = spark.read.format("csv") \
            .option("header","true") \
            .option("inferSchema","true") \
            .schema(actors_schema) \
            .option("mode", "FAILFAST") \
            .load(filePath_corrupted) 

display(actorsWithSchema)

# Save to JSON and check with DataFrameReader

In [0]:
actorsWithSchema.write.mode("overwrite") \
    .json("dbfs:/FileStore/tables/Files/actors.json")

actorsWithSchema.show()

+-------------+--------+-----------+---------------+----------+--------------------+
|imdb_title_id|ordering|imb_name_id|       category|       job|          characters|
+-------------+--------+-----------+---------------+----------+--------------------+
|    tt0000009|    null|  nm0063086|        actress|      null|[Miss Geraldine H...|
|    tt0000009|    null|  nm0183823|          actor|      null|      [Mr. Hamilton]|
|    tt0000009|    null|  nm1309758|          actor|      null|[Chauncey Depew -...|
|    tt0000009|    null|  nm0085156|       director|      null|                null|
|    tt0000574|    null|  nm0846887|        actress|      null|        [Kate Kelly]|
|    tt0000574|    null|  nm0846894|          actor|      null|     [School Master]|
|    tt0000574|    null|  nm3002376|          actor|      null|        [Steve Hart]|
|    tt0000574|    null|  nm0170118|        actress|      null|                null|
|    tt0000574|    null|  nm0846879|       director|      null|  

In [0]:
read_df = spark.read.format("json").load("dbfs:/FileStore/tables/Files/actors.json")
display(read_df)

category,characters,imb_name_id,imdb_title_id,job
actor,[Tom],nm6005417,tt3249124,
director,,nm1871431,tt3249124,
producer,,nm1862032,tt3249124,producer
composer,,nm4261282,tt3249124,
production_designer,,nm1677303,tt3249124,
actor,[Sebastian],nm2946712,tt3249158,
actress,[Miranda],nm2578315,tt3249158,
actor,[Jacinto],nm7033017,tt3249158,
actor,[Rosa],nm7033021,tt3249158,
director,,nm1848095,tt3249158,
