In [0]:
%fs ls /public/retail_db

path,name,size,modificationTime
dbfs:/public/retail_db/README.md,README.md,806,1687612223000
dbfs:/public/retail_db/categories/,categories/,0,1687612246000
dbfs:/public/retail_db/create_db.sql,create_db.sql,10303297,1687612243000
dbfs:/public/retail_db/create_db_tables_pg.sql,create_db_tables_pg.sql,1748,1687612222000
dbfs:/public/retail_db/customers/,customers/,0,1687612218000
dbfs:/public/retail_db/departments/,departments/,0,1687612222000
dbfs:/public/retail_db/load_db_tables_pg.sql,load_db_tables_pg.sql,10297372,1687612251000
dbfs:/public/retail_db/order_items/,order_items/,0,1687612223000
dbfs:/public/retail_db/orders/,orders/,0,1687612243000
dbfs:/public/retail_db/products/,products/,0,1687612221000


In [0]:
%fs ls /public/retail_db/orders

path,name,size,modificationTime
dbfs:/public/retail_db/orders/part-00000,part-00000,2999944,1687612246000


In [0]:
schema = """
    order_id INT,
    order_date TIMESTAMP,
    order_customer_id INT,
    order_status STRING
"""

In [0]:
orders = spark.read.schema(schema).csv('/public/retail_db/orders')

In [0]:
orders.show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

In [0]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [0]:
orders.dtypes

Out[12]: [('order_id', 'int'),
 ('order_date', 'timestamp'),
 ('order_customer_id', 'int'),
 ('order_status', 'string')]

In [0]:
%fs ls /public/retail_db_json

path,name,size,modificationTime
dbfs:/public/retail_db_json/categories/,categories/,0,1687613147000
dbfs:/public/retail_db_json/create_db_tables_pg.sql,create_db_tables_pg.sql,1748,1687613137000
dbfs:/public/retail_db_json/customers/,customers/,0,1687613134000
dbfs:/public/retail_db_json/departments/,departments/,0,1687613137000
dbfs:/public/retail_db_json/order_items/,order_items/,0,1687613138000
dbfs:/public/retail_db_json/orders/,orders/,0,1687613143000
dbfs:/public/retail_db_json/products/,products/,0,1687613136000


In [0]:
%fs ls /public/retail_db_json/orders

path,name,size,modificationTime
dbfs:/public/retail_db_json/orders/part-r-00000-990f5773-9005-49ba-b670-631286032674,part-r-00000-990f5773-9005-49ba-b670-631286032674,7477339,1687613147000


In [0]:
orders = spark.read.json('/public/retail_db_json/orders')

In [0]:
orders.printSchema()

root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [0]:
orders.show()

In [0]:
import getpass
user = getpass.getuser()

# gives user name in os level

In [0]:
input_dir = '/public/retail_db_json'
output_dir = '/user/root/retail_db_parquet'

# convert files in input_dir to parquet in output_dir

# dbutils.fs.ls("path") - it exposes all the files as a list in that path

print(dbutils.fs.ls(input_dir))
print("##"*60)

for file_details in dbutils.fs.ls(input_dir):
    if not file_details.path.endswith("sql"):
        print(f'Convert data in {file_details.path} from json to parquet',end="\n")
        data_set_dir = file_details.path.split("/")[-2]
        df = spark.read.json(file_details.path)
        df.write.parquet(f'{output_dir}/{data_set_dir}',mode="overwrite")

[FileInfo(path='dbfs:/public/retail_db_json/categories/', name='categories/', size=0, modificationTime=1687613147000), FileInfo(path='dbfs:/public/retail_db_json/create_db_tables_pg.sql', name='create_db_tables_pg.sql', size=1748, modificationTime=1687613137000), FileInfo(path='dbfs:/public/retail_db_json/customers/', name='customers/', size=0, modificationTime=1687613134000), FileInfo(path='dbfs:/public/retail_db_json/departments/', name='departments/', size=0, modificationTime=1687613137000), FileInfo(path='dbfs:/public/retail_db_json/order_items/', name='order_items/', size=0, modificationTime=1687613138000), FileInfo(path='dbfs:/public/retail_db_json/orders/', name='orders/', size=0, modificationTime=1687613143000), FileInfo(path='dbfs:/public/retail_db_json/products/', name='products/', size=0, modificationTime=1687613136000)]
########################################################################################################################
Convert data in dbfs:/public/retail

In [0]:
dbutils.fs.ls(output_dir+"/orders")
# without coalesce(1) , we may see multiple parquet files

Out[40]: [FileInfo(path='dbfs:/user/root/retail_db_parquet/orders/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1688738978000),
 FileInfo(path='dbfs:/user/root/retail_db_parquet/orders/_committed_5387188075219294211', name='_committed_5387188075219294211', size=430, modificationTime=1688738977000),
 FileInfo(path='dbfs:/user/root/retail_db_parquet/orders/_committed_8374403232092277263', name='_committed_8374403232092277263', size=222, modificationTime=1688738894000),
 FileInfo(path='dbfs:/user/root/retail_db_parquet/orders/_started_5387188075219294211', name='_started_5387188075219294211', size=0, modificationTime=1688738977000),
 FileInfo(path='dbfs:/user/root/retail_db_parquet/orders/_started_8374403232092277263', name='_started_8374403232092277263', size=0, modificationTime=1688738893000),
 FileInfo(path='dbfs:/user/root/retail_db_parquet/orders/part-00000-tid-5387188075219294211-e49e393e-1e29-4d5c-9a47-e0d62d2e3578-41-1-c000.snappy.parquet', name='part-00000-tid-538718807521

In [0]:
spark.read.parquet(output_dir+"/orders").show()

+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|            12111|2013-07-25 00:00:...|       3|       COMPLETE|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|
|            11318|2013-07-25 00:00:...|       5|       COMPLETE|
|             7130|2013-07-25 00:00:...|       6|       COMPLETE|
|             4530|2013-07-25 00:00:...|       7|       COMPLETE|
|             2911|2013-07-25 00:00:...|       8|     PROCESSING|
|             5657|2013-07-25 00:00:...|       9|PENDING_PAYMENT|
|             5648|2013-07-25 00:00:...|      10|PENDING_PAYMENT|
|              918|2013-07-25 00:00:...|      11| PAYMENT_REVIEW|
|             1837|2013-07-25 00:00:...|      12|         CLOSED|
|         

In [0]:
## Convert comma separated Files to pipe separated files

In [0]:
input_dir = "/public/retail_db"
output_dir = f"/user/root/retail_db_pipe"

dbutils.fs.ls(input_dir)

print("##"*60)

for file_details in dbutils.fs.ls(input_dir):
    if not (file_details.path.endswith("sql") or ".md" in file_details):
        print(f'Convert {file_details.path} data to pipe separated ',end="\n")
        data_set_dir = file_details.path.split("/")[-2]
        df = spark.read.json(file_details.path)
        # use mode to overwrite the data
        df.coalesce(1).write.mode("overwrite").csv(f'{output_dir}/{data_set_dir}',sep="|")

########################################################################################################################
Convert dbfs:/public/retail_db/README.md data to pipe separated 


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1187990605708950>:14[0m
[1;32m     12[0m df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mjson(file_details[38;5;241m.[39mpath)
[1;32m     13[0m [38;5;66;03m# use mode to overwrite the data[39;00m
[0;32m---> 14[0m df[38;5;241m.[39mcoalesce([38;5;241m1[39m)[38;5;241m.[39mwrite[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39mcsv([38;5;124mf[39m[38;5;124m'[39m[38;5;132;01m{[39;00moutput_dir[38;5;132;01m}[39;00m[38;5;124m/[39m[38;5;132;01m{[39;00mdata_set_dir[38;5;132;01m}[39;00m[38;5;124m'[39m,sep[38;5;241m=[39m[38;5;124m"[39m[38;5;124m|[39m[38;5;124m"[39m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **k

In [0]:
schema = """
    order_id INT,
    order_date TIMESTAMP,
    order_customer_id INT,
    order_status STRING
"""

orders = spark.read.schema(schema).csv(f"/user/root/retail_db_pipe/orders",sep="|")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1187990605708951>:8[0m
[1;32m      1[0m schema [38;5;241m=[39m [38;5;124m"""[39m
[1;32m      2[0m [38;5;124m    order_id INT,[39m
[1;32m      3[0m [38;5;124m    order_date TIMESTAMP,[39m
[1;32m      4[0m [38;5;124m    order_customer_id INT,[39m
[1;32m      5[0m [38;5;124m    order_status STRING[39m
[1;32m      6[0m [38;5;124m"""[39m
[0;32m----> 8[0m orders [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mschema(schema)[38;5;241m.[39mcsv([38;5;124mf[39m[38;5;124m"[39m[38;5;124m/user/root/retail_db_pipe/orders[39m[38;5;124m"[39m,sep[38;5;241m=[39m[38;5;124m"[39m[38;5;124m|[39m[38;5;124m"[39m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m

In [0]:

type(spark)

Out[57]: pyspark.sql.session.SparkSession

In [0]:
type(spark.read)

Out[58]: pyspark.sql.readwriter.DataFrameReader

In [0]:
# * Reading files using direct APIs such as csv, json, etc under spark.read.
# * Reading files using format and load under spark.read.
# * Specifying options as arguments as well as using functions such as option and options.
# * Supported file formats.
#         csv
#         text
#         json
#         parquet
#         orc
# * Other common file formats.
#         xml
#         avro
# * Important file formats for certification - csv, json, parquet
# * Reading compressed files

In [0]:
# Check if the files are compressed (gz, snappy, bz2, etc). Most common ones are gz and snappy.

# Understand the file format (text, json, avro, parquet, orc, etc). Sometimes files will not have extensions.

# If files does not have extensions, make sure to confirm the details by going through the tech spec or by opening the file.

# We will get tech specs from our leads or architects while working on real world projects.

# If the files are of text file format, check if the data is delimited or separated by a specific character.

# Use appropriate API under spark.read to read the data.

# we dont have api methods to read avro files directly.

In [0]:
%fs ls /public/retail_db/orders

path,name,size,modificationTime
dbfs:/public/retail_db/orders/part-00000,part-00000,2999944,1687612246000


In [0]:
# the above file is in text mode
spark.read.text("/public/retail_db/orders").show(truncate=False)

+---------------------------------------------+
|value                                        |
+---------------------------------------------+
|1,2013-07-25 00:00:00.0,11599,CLOSED         |
|2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT  |
|3,2013-07-25 00:00:00.0,12111,COMPLETE       |
|4,2013-07-25 00:00:00.0,8827,CLOSED          |
|5,2013-07-25 00:00:00.0,11318,COMPLETE       |
|6,2013-07-25 00:00:00.0,7130,COMPLETE        |
|7,2013-07-25 00:00:00.0,4530,COMPLETE        |
|8,2013-07-25 00:00:00.0,2911,PROCESSING      |
|9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT |
|10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT|
|11,2013-07-25 00:00:00.0,918,PAYMENT_REVIEW  |
|12,2013-07-25 00:00:00.0,1837,CLOSED         |
|13,2013-07-25 00:00:00.0,9149,PENDING_PAYMENT|
|14,2013-07-25 00:00:00.0,9842,PROCESSING     |
|15,2013-07-25 00:00:00.0,2568,COMPLETE       |
|16,2013-07-25 00:00:00.0,7276,PENDING_PAYMENT|
|17,2013-07-25 00:00:00.0,2667,COMPLETE       |
|18,2013-07-25 00:00:00.0,1205,CLOSED   

In [0]:
spark.read.json("/public/retail_db_json/orders").show()

+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|            12111|2013-07-25 00:00:...|       3|       COMPLETE|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|
|            11318|2013-07-25 00:00:...|       5|       COMPLETE|
|             7130|2013-07-25 00:00:...|       6|       COMPLETE|
|             4530|2013-07-25 00:00:...|       7|       COMPLETE|
|             2911|2013-07-25 00:00:...|       8|     PROCESSING|
|             5657|2013-07-25 00:00:...|       9|PENDING_PAYMENT|
|             5648|2013-07-25 00:00:...|      10|PENDING_PAYMENT|
|              918|2013-07-25 00:00:...|      11| PAYMENT_REVIEW|
|             1837|2013-07-25 00:00:...|      12|         CLOSED|
|         

In [0]:
%fs ls /user/root/retail_db_parquet/orders

path,name,size,modificationTime
dbfs:/user/root/retail_db_parquet/orders/_SUCCESS,_SUCCESS,0,1688738978000
dbfs:/user/root/retail_db_parquet/orders/_committed_5387188075219294211,_committed_5387188075219294211,430,1688738977000
dbfs:/user/root/retail_db_parquet/orders/_committed_8374403232092277263,_committed_8374403232092277263,222,1688738894000
dbfs:/user/root/retail_db_parquet/orders/_started_5387188075219294211,_started_5387188075219294211,0,1688738977000
dbfs:/user/root/retail_db_parquet/orders/_started_8374403232092277263,_started_8374403232092277263,0,1688738893000
dbfs:/user/root/retail_db_parquet/orders/part-00000-tid-5387188075219294211-e49e393e-1e29-4d5c-9a47-e0d62d2e3578-41-1-c000.snappy.parquet,part-00000-tid-5387188075219294211-e49e393e-1e29-4d5c-9a47-e0d62d2e3578-41-1-c000.snappy.parquet,294544,1688738977000
dbfs:/user/root/retail_db_parquet/orders/part-00001-tid-5387188075219294211-e49e393e-1e29-4d5c-9a47-e0d62d2e3578-42-1-c000.snappy.parquet,part-00001-tid-5387188075219294211-e49e393e-1e29-4d5c-9a47-e0d62d2e3578-42-1-c000.snappy.parquet,241619,1688738977000


In [0]:
# We can read the data from CSV files into Spark Data Frame using multiple approaches.

# Approach 1: spark.read.csv('path_to_folder')

# Approach 2: spark.read.format('csv').load('path_to_folder')

# We can explicitly specify the schema as string or using StructType.

# We can also read the data which is delimited or separated by other characters than comma.

# If the files have header we can create the Data Frame with schema by using options such as header and inferSchema. It will pick column names from the header while data types will be inferred based on the data.

# If the files does not have header we can create the Data Frame with schema by passing column names using toDF and by using inferSchema option.

In [0]:
# Approach 1

# Default behavior
# It will delimit the data using comma as separator

# Column names will be system generated
# All the fields will be of type strings
orders = spark.read.csv('/public/retail_db/orders')
     
orders.columns
# All these column names aare system generated names.
# All the fields will be of type strings

Out[11]: ['_c0', '_c1', '_c2', '_c3']

In [0]:
help(spark.read.schema)

# The functions that are available on spark.read are also available on spark.read.schema

Help on method schema in module pyspark.sql.readwriter:

schema(schema: Union[pyspark.sql.types.StructType, str]) -> 'DataFrameReader' method of pyspark.sql.readwriter.DataFrameReader instance
    Specifies the input schema.
    
    Some data sources (e.g. JSON) can infer the input schema automatically from data.
    By specifying the schema here, the underlying data source can skip the schema
    inference step, and thus speed up data loading.
    
    .. versionadded:: 1.4.0
    
    .. versionchanged:: 3.4.0
        Support Spark Connect.
    
    Parameters
    ----------
    schema : :class:`pyspark.sql.types.StructType` or str
        a :class:`pyspark.sql.types.StructType` object or a DDL-formatted string
        (For example ``col0 INT, col1 DOUBLE``).
    
    Examples
    --------
    >>> spark.read.schema("col0 INT, col1 DOUBLE")
    <...readwriter.DataFrameReader object ...>
    
    Specify the schema with reading a CSV file.
    
    >>> import tempfile
    >>> with temp

In [0]:
help(spark.read.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(path: Union[str, List[str]], schema: Union[pyspark.sql.types.StructType, str, NoneType] = None, sep: Optional[str] = None, encoding: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, comment: Optional[str] = None, header: Union[bool, str, NoneType] = None, inferSchema: Union[bool, str, NoneType] = None, ignoreLeadingWhiteSpace: Union[bool, str, NoneType] = None, ignoreTrailingWhiteSpace: Union[bool, str, NoneType] = None, nullValue: Optional[str] = None, nanValue: Optional[str] = None, positiveInf: Optional[str] = None, negativeInf: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, maxColumns: Union[int, str, NoneType] = None, maxCharsPerColumn: Union[int, str, NoneType] = None, maxMalformedLogPerPartition: Union[int, str, NoneType] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, multiLine: Union[bool, str, NoneType] 

In [0]:
help(spark.read.format("csv").load)

# U can pass schema to load function

# help(spark.read.format("json").load)
# help(spark.read.format("text").load)

Help on method load in module pyspark.sql.readwriter:

load(path: Union[str, List[str], NoneType] = None, format: Optional[str] = None, schema: Union[pyspark.sql.types.StructType, str, NoneType] = None, **options: 'OptionalPrimitiveType') -> 'DataFrame' method of pyspark.sql.readwriter.DataFrameReader instance
    Loads data from a data source and returns it as a :class:`DataFrame`.
    
    .. versionadded:: 1.4.0
    
    .. versionchanged:: 3.4.0
        Support Spark Connect.
    
    Parameters
    ----------
    path : str or list, optional
        optional string or a list of string for file-system backed data sources.
    format : str, optional
        optional string for format of the data source. Default to 'parquet'.
    schema : :class:`pyspark.sql.types.StructType` or str, optional
        optional :class:`pyspark.sql.types.StructType` for the input schema
        or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
    **options : dict
        all other stri

In [0]:
schema = """
    order_id INT,
    order_date TIMESTAMP,
    order_customer_id INT,
    order_status STRING
"""

spark.read.schema(schema).csv("/public/retail_db/orders").show()
# spark.read.csv("/public/retail_db/orders",schema=schema).show()
#spark.read.format("csv").load("/public/retail_db/orders").show()

# with load and csv, we can mention the path of the data

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

In [0]:
spark.read.csv("/public/retail_db/orders",schema=schema).show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType


help(StructField)

Help on class StructField in module pyspark.sql.types:

class StructField(DataType)
 |  StructField(name: str, dataType: pyspark.sql.types.DataType, nullable: bool = True, metadata: Optional[Dict[str, Any]] = None)
 |  
 |  A field in :class:`StructType`.
 |  
 |  Parameters
 |  ----------
 |  name : str
 |      name of the field.
 |  dataType : :class:`DataType`
 |      :class:`DataType` of the field.
 |  nullable : bool, optional
 |      whether the field can be null (None) or not.
 |  metadata : dict, optional
 |      a dict from string to simple type that can be toInternald to JSON automatically
 |  
 |  Examples
 |  --------
 |  >>> from pyspark.sql.types import StringType, StructField
 |  >>> (StructField("f1", StringType(), True)
 |  ...      == StructField("f1", StringType(), True))
 |  True
 |  >>> (StructField("f1", StringType(), True)
 |  ...      == StructField("f2", StringType(), True))
 |  False
 |  
 |  Method resolution order:
 |      StructField
 |      DataType
 |    

In [0]:
schema = StructType([
    StructField('order_id', IntegerType()),
    StructField('order_date', TimestampType()),
    StructField('order_customer_id', IntegerType()),
    StructField('order_status', StringType())
])


spark.read.schema(schema).csv('/public/retail_db/orders').show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

In [0]:
type(schema)
# it is of type structype

Out[8]: pyspark.sql.types.StructType

In [0]:

columns = ['order_id', 'order_date', 'order_customer_id', 'order_status']
     
spark.read.option('inferSchema', True).csv('/public/retail_db/orders').dtypes

# The column names from above are system generated ones
     


help(spark.read.option('inferSchema', True).csv('/public/retail_db/orders').toDF)

Help on method toDF in module pyspark.sql.dataframe:

toDF(*cols: str) -> 'DataFrame' method of pyspark.sql.dataframe.DataFrame instance
    Returns a new :class:`DataFrame` that with new specified column names
    
    .. versionchanged:: 3.4.0
        Support Spark Connect.
    
    Parameters
    ----------
    *cols : tuple
        a tuple of string new column name. The length of the
        list needs to be the same as the number of columns in the initial
        :class:`DataFrame`
    
    Returns
    -------
    :class:`DataFrame`
        DataFrame with new column names.
    
    Examples
    --------
    >>> df = spark.createDataFrame([(14, "Tom"), (23, "Alice"),
    ...     (16, "Bob")], ["age", "name"])
    >>> df.toDF('f1', 'f2').show()
    +---+-----+
    | f1|   f2|
    +---+-----+
    | 14|  Tom|
    | 23|Alice|
    | 16|  Bob|
    +---+-----+



In [0]:
# when we put inferschem=True, it would actually get datatype of each column by going to each and every field in entire data

spark.read.option('inferSchema', True).csv('/public/retail_db/orders').toDF('order_id', 'order_date', 'order_customer_id', 'order_status')

# in above, by passing columns , it uses thosee columns in the dataframe.

# or, in below, we use * to upack the columns     

# spark.read.option('inferSchema', True).csv('/public/retail_db/orders').toDF(*columns)
     


spark.read.csv('/public/retail_db/orders', inferSchema=True).toDF(*columns)
     
# Another method other than "option" to use inferschema

df = spark.read.csv('/public/retail_db/orders', inferSchema=True).toDF(*columns)
df.show()



# when we put inferschem=True, it would actually get datatype of each column by going to each and every field in entire data
     

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

In [0]:
df.dtypes

Out[14]: [('order_id', 'int'),
 ('order_date', 'timestamp'),
 ('order_customer_id', 'int'),
 ('order_status', 'string')]

In [0]:
spark.read.csv("/public/retail_db/orders",sep=",").show()

# or
# spark.read.scehma(schema).csv("path",sep=",").show()

# if u use wrong delimiter, it gives null values as o/p

+---+--------------------+-----+---------------+
|_c0|                 _c1|  _c2|            _c3|
+---+--------------------+-----+---------------+
|  1|2013-07-25 00:00:...|11599|         CLOSED|
|  2|2013-07-25 00:00:...|  256|PENDING_PAYMENT|
|  3|2013-07-25 00:00:...|12111|       COMPLETE|
|  4|2013-07-25 00:00:...| 8827|         CLOSED|
|  5|2013-07-25 00:00:...|11318|       COMPLETE|
|  6|2013-07-25 00:00:...| 7130|       COMPLETE|
|  7|2013-07-25 00:00:...| 4530|       COMPLETE|
|  8|2013-07-25 00:00:...| 2911|     PROCESSING|
|  9|2013-07-25 00:00:...| 5657|PENDING_PAYMENT|
| 10|2013-07-25 00:00:...| 5648|PENDING_PAYMENT|
| 11|2013-07-25 00:00:...|  918| PAYMENT_REVIEW|
| 12|2013-07-25 00:00:...| 1837|         CLOSED|
| 13|2013-07-25 00:00:...| 9149|PENDING_PAYMENT|
| 14|2013-07-25 00:00:...| 9842|     PROCESSING|
| 15|2013-07-25 00:00:...| 2568|       COMPLETE|
| 16|2013-07-25 00:00:...| 7276|PENDING_PAYMENT|
| 17|2013-07-25 00:00:...| 2667|       COMPLETE|
| 18|2013-07-25 00:0

In [0]:
# We can pass the options using different ways while creating the Data Frame.

# Using key word arguments as part of APIs. We can use key word arguments as part of load as well as direct API (csv).

# spark.read.option - we have to use multiple times for multiple settings

# spark.read.options - we can use all settings at a time in options

# If key in the option is incorrect then the options will be ignored.
# Depending up on the API based on the file format the options as well as arguments vary.

In [0]:
import getpass
username = getpass.getuser()

# Default behavior


# It will delimit the data using comma as separator
# Column names will be system generated
# All the fields will be of type strings as didnt specify schema and as inferschema=None by default

orders = spark.read.csv(f'/public/retail_db/orders')

In [0]:
orders.show(truncate=False)

orders.dtypes # with inferschema, we get datatype from column in dataframe

+---+---------------------+-----+---------------+
|_c0|_c1                  |_c2  |_c3            |
+---+---------------------+-----+---------------+
|1  |2013-07-25 00:00:00.0|11599|CLOSED         |
|2  |2013-07-25 00:00:00.0|256  |PENDING_PAYMENT|
|3  |2013-07-25 00:00:00.0|12111|COMPLETE       |
|4  |2013-07-25 00:00:00.0|8827 |CLOSED         |
|5  |2013-07-25 00:00:00.0|11318|COMPLETE       |
|6  |2013-07-25 00:00:00.0|7130 |COMPLETE       |
|7  |2013-07-25 00:00:00.0|4530 |COMPLETE       |
|8  |2013-07-25 00:00:00.0|2911 |PROCESSING     |
|9  |2013-07-25 00:00:00.0|5657 |PENDING_PAYMENT|
|10 |2013-07-25 00:00:00.0|5648 |PENDING_PAYMENT|
|11 |2013-07-25 00:00:00.0|918  |PAYMENT_REVIEW |
|12 |2013-07-25 00:00:00.0|1837 |CLOSED         |
|13 |2013-07-25 00:00:00.0|9149 |PENDING_PAYMENT|
|14 |2013-07-25 00:00:00.0|9842 |PROCESSING     |
|15 |2013-07-25 00:00:00.0|2568 |COMPLETE       |
|16 |2013-07-25 00:00:00.0|7276 |PENDING_PAYMENT|
|17 |2013-07-25 00:00:00.0|2667 |COMPLETE       |


In [0]:
orders = spark. \
    read. \
    csv(
        f'/public/retail_db/orders',
        sep=',',
        header=None,
        inferSchema=True
    ). \
    toDF('order_id', 'order_date', 'order_customer_id', 'order_status')

# we used toDF - to set column names in dataframe

# similarly with load

orders = spark. \
    read. \
    format('csv'). \
    load(
        f'/public/retail_db/orders',
        sep=',',
        header=None,
        inferSchema=True
    ). \
    toDF('order_id', 'order_date', 'order_customer_id', 'order_status')
     

In [0]:
## with option
help(spark.read.option)
# it returns a dataframe reader

Help on method option in module pyspark.sql.readwriter:

option(key: str, value: 'OptionalPrimitiveType') -> 'DataFrameReader' method of pyspark.sql.readwriter.DataFrameReader instance
    Adds an input option for the underlying data source.
    
    .. versionadded:: 1.5.0
    
    .. versionchanged:: 3.4.0
        Support Spark Connect.
    
    Parameters
    ----------
    key : str
        The key for the option to set.
    value
        The value for the option to set.
    
    Examples
    --------
    >>> from pyspark.sql import SparkSession
    >>> spark = SparkSession.builder.master("local").getOrCreate()
    >>> spark.read.option("key", "value")
    <...readwriter.DataFrameReader object ...>
    
    Specify the option 'nullValue' with reading a CSV file.
    
    >>> import tempfile
    >>> with tempfile.TemporaryDirectory() as d:
    ...     # Write a DataFrame into a CSV file
    ...     df = spark.createDataFrame([{"age": 100, "name": "Hyukjin Kwon"}])
    ...     df.wri

In [0]:


orders = spark. \
    read. \
    option('sep', ','). \
    option('header', None). \
    option('inferSchema', True). \
    csv(f'/public/retail_db/orders'). \
    toDF('order_id', 'order_date', 'order_customer_id', 'order_status')

# with options - u can use all settings at a time

orders = spark. \
    read. \
    options(sep=',', header=None, inferSchema=True). \
    csv(f'/public/retail_db/orders'). \
    toDF('order_id', 'order_date', 'order_customer_id', 'order_status')

# or 

options = {
    'sep': ',',
    'header': None,
    'inferSchema': True
}
     

orders = spark. \
    read. \
    options(**options). \
    csv(f'/public/retail_db/orders'). \
    toDF('order_id', 'order_date', 'order_customer_id', 'order_status')

# or use format=csv and use path in load (f - l)


orders = spark. \
    read. \
    options(**options). \
    format('csv'). \
    load(f'/public/retail_db/orders'). \
    toDF('order_id', 'order_date', 'order_customer_id', 'order_status')

In [0]:
help(spark.read.json)

Help on method json in module pyspark.sql.readwriter:

json(path: Union[str, List[str], pyspark.rdd.RDD[str]], schema: Union[pyspark.sql.types.StructType, str, NoneType] = None, primitivesAsString: Union[bool, str, NoneType] = None, prefersDecimal: Union[bool, str, NoneType] = None, allowComments: Union[bool, str, NoneType] = None, allowUnquotedFieldNames: Union[bool, str, NoneType] = None, allowSingleQuotes: Union[bool, str, NoneType] = None, allowNumericLeadingZero: Union[bool, str, NoneType] = None, allowBackslashEscapingAnyCharacter: Union[bool, str, NoneType] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, multiLine: Union[bool, str, NoneType] = None, allowUnquotedControlChars: Union[bool, str, NoneType] = None, lineSep: Optional[str] = None, samplingRatio: Union[str, float, NoneType] = None, dropFieldIfAllNull: Union[bool, str, NoneType] = None, encoding: Optional[str] = 

In [0]:
df = spark.read.json("/public/retail_db_json/orders")

# or 

df = spark.read.format("json").load("/public/retail_db_json/orders")

# with josn, the inferschema= true by default.so u get all datatypes of columns
print("types are ",df.dtypes)
df.inputFiles() # - gives details about file names

types are  [('order_customer_id', 'bigint'), ('order_date', 'string'), ('order_id', 'bigint'), ('order_status', 'string')]
Out[44]: ['dbfs:/public/retail_db_json/orders/part-r-00000-990f5773-9005-49ba-b670-631286032674']

In [0]:
df = spark.read.csv("/public/retail_db/orders")
df.dtypes
# # with csv, the inferschema= None by default.so u get string type for all columns

Out[41]: [('_c0', 'string'), ('_c1', 'string'), ('_c2', 'string'), ('_c3', 'string')]

In [0]:
spark.read.json('/public/retail_db_json/orders').dtypes
     


schema = """
    order_id INT,
    order_date TIMESTAMP,
    order_customer_id INT,
    order_status STRING
"""
     

# This will run faster as data will not be read to infer the schema
spark.read.schema(schema).json('/public/retail_db_json/orders').show()

#  spark.read.json('/public/retail_db_json/orders', schema=schema).show()
     

In [0]:
# If inferSchema is used entire data need to be read to infer the schema accurately while creating the Data Frame.

# If the data size is too big then additional time will be spent to infer the schema.

# When we explicitly specify the schema, data will not be read while creating the Data Frame.

# As we have seen we should be able to explicitly specify the schema using string or StructType.

# Inferring Schema will come handy to quickly understand the structure of the data as part of proof of concepts as well as design.
# Schema will be inferred by default for files of type JSON, Parquet and ORC. Column names and data types will be inferred using metadata that will be associated with these types of files.

# Inferring the schema on CSV files will create data frames with system generated column names. If inferSchema is used, 

# then the data frame will determine the data types. If the files contain header, then column names can be inherited using it. If not, we need to explicitly pass the columns using toDF.

In [0]:
help(spark.read.parquet)

Help on method parquet in module pyspark.sql.readwriter:

parquet(*paths: str, **options: 'OptionalPrimitiveType') -> 'DataFrame' method of pyspark.sql.readwriter.DataFrameReader instance
    Loads Parquet files, returning the result as a :class:`DataFrame`.
    
    .. versionadded:: 1.4.0
    
    .. versionchanged:: 3.4.0
        Support Spark Connect.
    
    Parameters
    ----------
    paths : str
    
    Other Parameters
    ----------------
    **options
        For the extra options, refer to
        `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option>`_
        for the version you use.
    
        .. # noqa
    
    Examples
    --------
    Write a DataFrame into a Parquet file and read it back.
    
    >>> import tempfile
    >>> with tempfile.TemporaryDirectory() as d:
    ...     # Write a DataFrame into a Parquet file
    ...     spark.createDataFrame(
    ...         [{"age": 100, "name": "Hyukjin Kwon"}]
    .

In [0]:
%fs ls /user/root/retail_db_parquet/orders

# the parquet files are compressed by snappy algorithm

In [0]:
df = spark.read.parquet(f'/user/{username}/retail_db_parquet/orders')

# or

df = spark.read.format('parquet').load(f'/user/{username}/retail_db_parquet/orders')
     

df.inputFiles() # gives the file names that are used for dataframe creation

Out[46]: ['dbfs:/user/root/retail_db_parquet/orders/part-00000-tid-5387188075219294211-e49e393e-1e29-4d5c-9a47-e0d62d2e3578-41-1-c000.snappy.parquet',
 'dbfs:/user/root/retail_db_parquet/orders/part-00001-tid-5387188075219294211-e49e393e-1e29-4d5c-9a47-e0d62d2e3578-42-1-c000.snappy.parquet']

In [0]:
# Schema will be inferred by default for files of type JSON, Parquet and ORC. Column names and data types will be inferred using metadata that will be associated with these types of files.

df = spark.read.parquet(f'/user/{username}/retail_db_parquet/orders')

df.dtypes

# This way takes time as inferschema=True and has to go to each and every line

Out[47]: [('order_customer_id', 'bigint'),
 ('order_date', 'string'),
 ('order_id', 'bigint'),
 ('order_status', 'string')]

In [0]:

schema = """
    order_id INT,
    order_date TIMESTAMP,
    order_customer_id INT,
    order_status STRING
"""

# This will run faster as data will not be read to infer the schema
# Fail to convert order_id as well as order_customer_id as int
spark.read.schema(schema).parquet(f'/user/{username}/retail_db_parquet/orders').show()
     
# But got error as INT is considered as INT64 in spark
# whereas BIGINT is considered as IntergerType

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-1511183447879623>:10[0m
[1;32m      1[0m schema [38;5;241m=[39m [38;5;124m"""[39m
[1;32m      2[0m [38;5;124m    order_id INT,[39m
[1;32m      3[0m [38;5;124m    order_date TIMESTAMP,[39m
[1;32m      4[0m [38;5;124m    order_customer_id INT,[39m
[1;32m      5[0m [38;5;124m    order_status STRING[39m
[1;32m      6[0m [38;5;124m"""[39m
[1;32m      8[0m [38;5;66;03m# This will run faster as data will not be read to infer the schema[39;00m
[1;32m      9[0m [38;5;66;03m# Fail to convert order_id as well as order_customer_id as int[39;00m
[0;32m---> 10[0m spark[38;5;241m.[39mread[38;5;241m.[39mschema(schema)[38;5;241m.[39mparquet([38;5;124mf[39m[38;5;124m'[39m[38;5;124m/user/[39m[38;5;132;01m{[39;00musername[38;5;132;01m}[39;00m[38;5;124m/retail_d

In [0]:
# Now it fails on order_date as it is string type not timestamp

schema = """
    order_id BIGINT,
    order_date TIMESTAMP,
    order_customer_id BIGINT,
    order_status STRING
"""
     

# Fail to type cast order_date to timestamp. In the files, it is represented as string
spark.read.schema(schema).parquet(f'/user/{username}/retail_db_parquet/orders').show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-1511183447879624>:12[0m
[1;32m      3[0m schema [38;5;241m=[39m [38;5;124m"""[39m
[1;32m      4[0m [38;5;124m    order_id BIGINT,[39m
[1;32m      5[0m [38;5;124m    order_date TIMESTAMP,[39m
[1;32m      6[0m [38;5;124m    order_customer_id BIGINT,[39m
[1;32m      7[0m [38;5;124m    order_status STRING[39m
[1;32m      8[0m [38;5;124m"""[39m
[1;32m     11[0m [38;5;66;03m# Fail to type cast order_date to timestamp. In the files, it is represented as string[39;00m
[0;32m---> 12[0m spark[38;5;241m.[39mread[38;5;241m.[39mschema(schema)[38;5;241m.[39mparquet([38;5;124mf[39m[38;5;124m'[39m[38;5;124m/user/[39m[38;5;132;01m{[39;00musername[38;5;132;01m}[39;00m[38;5;124m/retail_db_parquet/orders[39m[38;5;124m'[39m)[38;5;241m.[39mshow()

File [0;32m/d

In [0]:
schema = """
    order_id BIGINT,
    order_date STRING,
    order_customer_id BIGINT,
    order_status STRING
"""
     

spark.read.parquet(f'/user/{username}/retail_db_parquet/orders', schema=schema).show(5)

+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|            12111|2013-07-25 00:00:...|       3|       COMPLETE|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|
|            11318|2013-07-25 00:00:...|       5|       COMPLETE|
+-----------------+--------------------+--------+---------------+
only showing top 5 rows



In [0]:
from pyspark.sql.types import StructType, StructField, LongType, StringType
     
# u can use LongType or IntegerType for order_id and order_customer_id

schema = StructType([
    StructField('order_id', LongType()),
    StructField('order_date', StringType()),
    StructField('order_customer_id', LongType()),
    StructField('order_status', StringType())
])
     

spark.read.schema(schema).parquet(f'/user/{username}/retail_db_parquet/orders').show()
     


+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
|       4|2013-07-25 00:00:...|             8827|         CLOSED|
|       5|2013-07-25 00:00:...|            11318|       COMPLETE|
|       6|2013-07-25 00:00:...|             7130|       COMPLETE|
|       7|2013-07-25 00:00:...|             4530|       COMPLETE|
|       8|2013-07-25 00:00:...|             2911|     PROCESSING|
|       9|2013-07-25 00:00:...|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|             1837|         CLOSED|
|      13|

In [0]:
!pwd

/databricks/driver


In [0]:
%fs ls /databricks/driver