### Pipenv 

In [None]:
# Pyspak project 
# Macbook  or windows machine - python 3.10 is installed ( global version )

# I created one project - retailproject ( used same global version of python 3.10 )
# required packages -
# pyspark 3.2.1
# pytest ( some version )

# Another project - lendingclub project 
# incase it requires -
# python 3.8
# pyspark 3.5
# different version of pytest 

# so what to do in case need to use the global version of python installed 
# if so then need ot install another version of python - 3.8 , which will impact the other projects

# so what is the solution ?
# pipenv --> pip + venv 

# venv --> create a isolated virtual environment for your projects 
# five projects in your system:
# virtual env 1 for project 1 --> python 3.6 , pyspark 3.2 etc.,
# virtual env 2 for project 2 --> python 3.8 , pyspark 3.4 etc.,

# pip --> way to install packages ( package manager )

# each project should have own set of python , own set of packages ( few global installations are common )

# old way 
# .venv 
# create virtual environmet 
# activate virtual environment 
# install packages required 

# new way 
# pip install pipenv            --> first need to run this as it requires pip to be present ( python need to installed globally to run this ) , it is package manager 
# pipenv install pytest         --> this creates the virtual envirionment and package also is installed , it considers global version of python , if different version is required then can change the Pipfile and do it 
# pipenv shell                  --> to activate the virtual environment 
# Pipfile                       --> similar to requirements.txt 
# pipfile.lock                  --> has the exact version of each packages with sha 
# exit                          -->to come out of the enviroment 
# pipenv run python             --> to run python without activating the environment 
# pipenv install pytest --dev   --> to create the package in development environment only 
# pipenv uninstall pytest       --> to uninstall pytest package
#   
# pipenv --rm                   --> to remove the environment , but the pipfile and pipfile.lock are still present 
# pipenv install                --> to create an enviroment from Pipfile ( but should have Pipfile in the path ) 

# if we need a specific version of python or package then in pipfile change it to specific version and run pipenv --rm to remove the environment and run pipenv install to install the new environment with required versions 

# to get the specific version of any packages can change in Pipfile ( "==0.0.1" for package version under the required package inplace of *)
# can even change the python version in pipfile and install the package again ( pipenv --rm )
# if the specifc version of python mentioned in Pipfile, if not found in system then it try to install  
# Pyenv is used to install specific version of python ( pyenv install 3.9 , ./path/pyenv install 3.9 )

# when setting up the test or production environment for the same project use pipfile.lock as it has the exact versions of python and python packages 

### Project Structuring and execution 

In [None]:
### Project Structuring and execution 
# Retail Analysis
# Lets say we have :
# Customers.csv ( customer_id ,customer_fname,customer_lname,username ,password , address, city , state , pincode  )
# orders.csv ( order_id , order_date , customer_id , order_status )
# Problem statement -  Need to find the number of closed orders for each state 

# steps:
# create orders dataframe
# create cutsomers dataframe
# filter the orders dataframe 
# join the filtered orders dataframe with customers dataframe 
# group by on the state to get the count \

# to implement these steps in a notebook it is easy and simple 

# but how to do this in structured format , so that the code is modular and follows industry standards 

In [None]:
# root_of_project
#     - configs ( to keep the configurations )
#         - application.conf ( to keep the configs reated to the project like file_path etc.,)
#         - py_spark.conf ( to keep spark level configs like executor core ,executor memory , application name etc.,)
#     - lib ( to create the reusable libraries ) 
#         - configreader.py ( to read the configs created under configs files )
#         - datareader.py ( create the spark dataframe from data files )
#         - datamanipulation.py ( all the data manipulations , transformations ) 
#         - utils.py ( any utlility functions like creating spark session, creating any folders etc.,)
#     - data ( to keep the sample data for tesing , ideally data will be in datalake ) 
#         - orders.csv
#         - customers.csv 
#     - application_main.py ( entry point for the spark application , where all the functions defined are called ) 
#     - pipfile 
#     - pipfile.lock 

In [1]:
import configparser
from pyspark import SparkConf 


In [2]:
config = configparser.ConfigParser()
config.read("configs/application.conf")
app_conf = {}
# for (key, val) in config.items(env):
#     app_conf[key] = val
#     app_conf


In [17]:
config = configparser.ConfigParser()
config.read("configs/pyspark.conf")
pyspark_conf = SparkConf()

for (key, val) in config.items("TEST"):
    pyspark_conf.set(key, val)
pyspark_conf

<pyspark.conf.SparkConf at 0x1edd019deb0>

In [None]:
#config.items("LOCAL")
pyspark_conf.get("spark.app.name") 

'retail-test'

In [None]:
config.items("LOCAL")  

[('customers.file.path', 'data/customers.csv'),
 ('orders.file.path', 'data/orders.csv')]

### Unit testing 

In [None]:
# Unit testing 

# Good developers write unit test cases 
# testing small pieces of code to ensure it is working fine 
# testing samll unit of code 
# If the code is written in modular way then testing of each function seperately 
# one unit == one function 

# many frameworks to perform unit testing 
# unittest
# pytest 

# need to write test cases as a part off best practices 

In [None]:
# which units or functions of the codes need to be tested 

# read_customers         --> if the output is 12435 then it is passed 
# read_orders            --> if the output is 68884 then it is passed 
# filter_closed_orders   --> if the count is 7556 then it is passed 
# get_app_config         --> test if the configurations are read properly 

# how to test these functions ?
# need to create a file where one can write the unit tests 

In [None]:
# in pytest framework file name where we write the unit test cases should either start with test or end with test 
# test_retail_proj.py
# retail_proj_test.py

In [None]:
# python -m pytest is the command to run testing 
# python -m pytest -v 

In [None]:
# initial set of functions 

# import pytest 
# from lib.utils import get_spark_session
# from lib.datareader import read_customers, read_orders 
# from lib.datamanipulation import filter_closed_orders 
# from lib.configreader import get_app_config 

# def test_read_customers(): # function should start with test    
#     spark_s=get_spark_session("LOCAL")
#     customers_count=read_customers(spark_s,"LOCAL").count()
#     assert customers_count == 12435 # assert to check the true condition 

# def test_read_orders(): # function should start with test  
#     spark_s=get_spark_session("LOCAL")
#     orders_count=read_orders(spark_s,"LOCAL").count()
#     assert orders_count == 68884 # checking on the number of records 

# def test_filtered_orders(): # function should start with test    
#     spark_s=get_spark_session("LOCAL")
#     df_=filter_closed_orders(read_orders(spark_s,"LOCAL")).count()
#     assert df_ == 7556  

# def test_app_config():
#     dict_value_returned_=get_app_config("LOCAL")
#     assert dict_value_returned_["customers.file.path"] == "data/customers.csv" # testing any one value 

In [None]:
# in the inital set of functions can see spark session creation is a part of function testing  
# which is not expected 
# these has to be under fixtures 

# fixture is to write the setup code 
# write fixtures in conftest.py

# fixtures are picked automatically from conftest.py files  

In [None]:
# setup - fixture 
# perform unit testing - define unit testing 
# teardown - releasing the resources 

In [None]:
# python -m pytest --fixtures # to get the list of fixtures 
# few fixtures will be system defined and few will be user defined 

### Fixtures to check if calculated results match the expected results 

In [None]:
# Fixtures to check if calculated results match the expected results 

# now need to create another test case 
# need to test function  "count_orders_state"
# to test this the pre calculated results are already calcuated and kept in the data > test_results folder 
# now need ot create the function to test in test_retail_proj.py 

In [4]:
import pandas as pd
pd.read_csv("data/customers.csv").groupby(["state"])["customer_id"].count().reset_index().to_csv("data/test_results/state_aggregate.csv",index=False)

### Markers 

In [None]:
# Markers 
# to label test cases use markers
# if there are hundred test cases out of which forty are for dataloading testing can maker them using markers 
# using
# eg: @pytest.mark.dataloading
# python -m pytest -m datamanipulation # to run only the test cases marked as datamanipulation test cases 

# python -m pytest --markers # to get the list of markers, including system defined and user defined 

# pytest.ini # to mention the markers so that it can be used as pytest configuration 
# [pytest]
# markers =
#         datareading: test the data reading functions
#         datamanipulation: tests the data manipulation functions

# python -m pytest -m "not datamanipulation"  # to select and execute the test cases apart from datamanipulation   

# if any of the test case is not fully developed and work in progress
# the can mention 
# @pytest.mark.skip("work in progress")

### Parameterised generic test cases 

In [None]:
# Parameterised generic test cases 
# In function "filter_closed_orders(orders_df)" the filtering is hardcoded to "CLOSED" , instead of this need to pass the generic condition . i.e passing the parameter to function 
# defined as function "filter_closed_orders_parameterised" in datamanipulation.py by passign the status parameter 

# in test_retail_proj.py can create multiple functions for closed , complete , processing etc., but in this approcah the lot of redundecy is present 
# can see functions created in test_filter_closed_orders_generic, test_filter_complete_orders_generic , test_filter_pending_orders_generic etc., created in test_retail_proj.py

# so make is it more efficient 
# need to define the parameters to pass using the below in test_retail_proj.py :

# @pytest.mark.parametrize(
#         "status_,count_",
#         [("COMPLETE",22900),
#          ("PENDING",7610),
#          ("PROCESSING",8275)]
# )

# which will run the text case "test_filter_orders_generic" in test_reatil_proj.py by passing each parameter once 
# it passes the parameters to,  the test case or functions where the above parameters are defined 

# for good production application writing test cases is very good practices 

### logging 
#### all logging related files are in "C:/Users/user/Downloads/Ultimate_bigdata/Week13/Project_Structuring_pytest_materials"

In [None]:
# logging 
# logging is required when an application is productionised  
# getting the logs 

# to understand the flow of control used print statements and for debuggging 
# few issues with print for debuggimg 
# 1.cannot set the priorities logging level like info, warn, error ,fatal etc.,
# 2.if there are multiple print statements are used durimg debugging , then manual efforts are required to removes
# 3. print makes aplications slow  

# logging is the best way to solve the issues   
# Log4j is used for logging 
# spark internally uses log4j for logging , so we can reuse the same for application logs 

# instance of log4j object can be obtained from spark session 

# in utils.py adding extra config while creating the spark session 
# new files gets created -- log4j.properties and logger.py
# modification -- application_main.py 

In [None]:
# logging levels :
# debug - verbose related information
# info - all information related things 
# warn - warning messages 
# error - error related messages
# fatal - bigger error 

# in log4j.properties file if loggimg level is defined as debug all debug , info , warn , error and fatal messages are displayed 
# in log4j.properties file if loggimg level is defined as info  all  info , warn , error and fatal messages are displayed 
# in the same way if warn --> warn , error , fatal are displayed 
# error --> error , fatal
# fatal --> fatal 

In [None]:
# target location 
# target location to log
# 1. console 
# 2. file 

# message format 
# message format to display the log 

# logging level , target location and message format are main things to consider 

#### log4j.properties file ,

In [None]:
# log4j.properties file , is placed in root 

# # root
# log4j.rootCategory=INFO, console   --> INFO - logging level , console - to print 

# # console appender
# log4j.appender.console=org.apache.log4j.ConsoleAppender  --> console appender , to console  ( incase of file as traget need to have FileAppender and respecive properties , if two appenders are defined then will all in both places )
# log4j.appender.console.target=System.out
# log4j.appender.console.layout=org.apache.log4j.PatternLayout --> pattern layout to print the log
# log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n  --> print log in specific format 
#                                                 # %d -- date , %p -- priority , %c -- script name / caller , %m -- message , %n -- newlines  

# # spark configs
# log4j.logger.org.apache.spark.repl.Main=WARN
# log4j.logger.org.spark_project.jetty=WARN
# log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
# log4j.logger.org.apache.parquet=ERROR
# log4j.logger.parquet=ERROR
# log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
# log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
# log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
# log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

# #user logs
# log4j.logger.retail_analysis=ERROR, console # for the respective application , retail_analysis is the project folder names
# log4j.additivity.retail_analysis=false  # to avoid double logging 

#### logger.py   

In [None]:
# logger.py 

# under lib logger.py is placed 
# the code inside it is boiler plate 

#### logging main script    

In [None]:
# logging main script 

# spark = Utils.get_spark_session(job_run_env)          # creating spark session 

# logger = Log4j(spark) # log4j instances 

# logger.warn("Created Spark Session")                  # logging  

# orders_df = DataReader.read_orders(spark,job_run_env)

# orders_filtered = DataManipulation.filter_closed_orders(orders_df)

# customers_df = DataReader.read_customers(spark,job_run_env)

# joined_df = DataManipulation.join_orders_customers(orders_filtered,customers_df)

# aggregated_results = DataManipulation.count_orders_state(joined_df)

# aggregated_results.show(50)

# #print(aggregated_results.collect())

# logger.info("this is the end of main")                   # logging 

In [3]:
import pandas as pd
dff_=pd.read_csv("data/orders.csv")
dff_.groupby(["order_status"])["order_id"].count().reset_index()

Unnamed: 0,order_status,order_id
0,CANCELED,1428
1,CLOSED,7556
2,COMPLETE,22900
3,ON_HOLD,3798
4,PAYMENT_REVIEW,729
5,PENDING,7610
6,PENDING_PAYMENT,15030
7,PROCESSING,8275
8,SUSPECTED_FRAUD,1558


In [None]:
# git commands
# git.scm -- to install the git ( local )
# 
# git --version
# In Windows git bash is used to wrtie commands 
# 
# project is developed using IDE like VScode , Pycharm etc.,
# github.com is used to create the github ( remotes )