In [None]:
# Databricks notebook source
# DBTITLE 1,Library Imports
from pyspark.sql import DataFrame
from pyspark.sql.types import *

# COMMAND ----------

# DBTITLE 1,Function to get the DataFrame based on specified location and format
"""
function: getDataFrame
  params (param name[param type])
    1) format[str] - defines the format of the DataFrame source (e.g. parquet/csv)
    2) header[str] = should include header or not (e.g. false)
    3) delimiter[str] = column delimiter to split the columns (e.g. \t)
    4) location[str] = location of the file to generate the DataFrame for (e.g. /mnt/abidatasummaryread/Intake/MSSales/MSFSALSL00/)
    5) schema[StructType] = schema of the file mentioned in location param (e.g. output generated by getTableSchema(MSFSALSL00) call)
  return DataFrame
"""

def getDataFrame(format: str, header: str, delimiter: str, location: str, schema: StructType) -> DataFrame:
  df = spark.read.format(format).option("header",header).schema(schema).option("delimiter",delimiter).load(location)
  return df


def getDataFrameParquet(location: str) -> DataFrame:
  df = spark.read.parquet(location)
  return df

# COMMAND ----------

# DBTITLE 1,Function to get the FYM{*}/FYMxx based on processing needs
"""
function: getFiscalYearRange
  params (param name[param type])
    1) param[str] - parameter to get the fiscal year ranage (e.g. d:FY21)
    first character (d/f/r) before : in d:FY21 defines if processing is (d)daily/f(FME)/r(restatement) 
    value after : in d:FY21 defines for how-many/what years needs to be considered for processing
    d: is special case and goes from CM-1 to M12
    f/r behave almost similar and are placeholders for any special scenario that may come
    f/r:FY20#FY21: process data for two complete years together. Can be used for any number of years.
        Performance of your notebook may vary based on these parameters
  return DataFrame
"""

def getFiscalYearRange(param:str) -> DataFrame:
  fy = param.replace("d:","").replace("f:","").replace("r:","")
  op = param[0:1]
  if op == "d":
    sql = """WITH CTE_Date
    AS
    (
        SELECT DISTINCT
            FiscalYearName
            , FiscalMonthName
            , FiscalMonth
            , REPLACE(CASE WHEN RelativeMonth = 'CM' THEN 'CM0' ELSE RelativeMonth END, 'CM', '') AS RelativeMonth
        FROM
           delta.`/mnt/abidatasummaryblobread/ConformedDataProd/ConformedDelta/DIM_Date/` AS d
        WHERE
            FiscalYearName = '""" + fy + """'
    )
    SELECT
        DISTINCT 
        LEFT(FiscalMonth, 4) AS FiscalYear
        , concat(RIGHT(FiscalMonth, 3),'{*}') AS FiscalMonth
        , FiscalMonth AS NameSuffix
    FROM CTE_Date
    WHERE RelativeMonth >= CASE WHEN date_format(current_date(), 'dd')  > 2 THEN -1 ELSE -2 END  
    """
  else:
    sql = """
    WITH CTE_Years
    AS
    (
      SELECT explode(split('""" + fy + """','#')) AS Year
    )
    SELECT TRIM(Year) AS FiscalYear, 'M{*}' AS FiscalMonth, TRIM(Year) AS NameSuffix
    FROM CTE_Years
    """
  df = spark.sql(sql)
  return df

# COMMAND ----------

# DBTITLE 1,Function to get the schema (StructType) based on active DYN Columns
"""
function: getTableSchema
  params (param name[param type])
    1) table[str] - table name of for which schema is required (e.g. MSFSALSL00)
        The name of the table provided in the parameter MUST match with SourceTableName column of
        [ABIVMPROSQL06\SUMMARY].STARLIGHT_00_ETL.dbo.DYN_Table table.
        The parquet at (/mnt/abidatasummary/Intake/dbo/DYNTableColumn/DYNTableColumn.parquet) is updated based on 
        MSSales intake under 01MSSalesADLSUpload schedule. Only BUS_DoNoUse = 0 and NotInSource = 0 columns are available.
  return StructType
    1) The ouput StructType is a JSON based schema translated to Spark-SQL compatible data-types based on trasnlation in the following function
        The field structure is {name:, type:, nullable:, metadata:}
"""

def getTableSchema(table:str) -> StructType:
  sql = """
  WITH CTE_Columns AS
  (
    SELECT
      DestinationColumnName
      , lower(SourceColumnDataType) AS SourceColumnDataType
      , case when SourceColumnIsNullable = 1 THEN 'True' else 'False' end Nullable
      , ColumnOrder
    FROM
      parquet.`/mnt/abidatasummary/Intake/dbo/DYNTableColumn/DYNTableColumn.parquet`
    WHERE lower(SourceTableName) = lower('""" + table + """')
  )
  SELECT
    DestinationColumnName
    , CASE 
        WHEN SourceColumnDataType = 'int' AND DestinationColumnName = 'LicenseTransactionItemId' THEN 'long'
        WHEN SourceColumnDataType = 'int' AND DestinationColumnName <> 'LicenseTransactionItemId' THEN 'integer'
        WHEN SourceColumnDataType = 'tinyint' THEN 'integer'
        WHEN SourceColumnDataType = 'bigint' THEN 'long'
        WHEN SourceColumnDataType = 'smallint' THEN 'integer'
        WHEN SourceColumnDataType = 'numeric' THEN 'decimal(19,4)'
        WHEN SourceColumnDataType = 'varchar' THEN 'string'
        WHEN SourceColumnDataType = 'nvarchar' THEN 'string'
        WHEN SourceColumnDataType = 'char' THEN 'string'
        ELSE 'string'
      END DataType
    , Nullable
  FROM
    CTE_Columns
  ORDER BY ColumnOrder
  """
  columnDF = spark.sql(sql)
  schema = ""
  for row in columnDF.rdd.collect():
    schema = schema + "{'name': '" + row.DestinationColumnName + "', 'type': '" + row.DataType + "', 'nullable':"  + row.Nullable + ", 'metadata': {}}"
  
  schema =  "{'type': 'struct', 'fields': [" + schema.replace("{}}{","{}},\n{") + "]}"
  schema = StructType.fromJson(eval(schema))
  return schema

# COMMAND ----------
  %sql
CREATE OR REPLACE TEMP VIEW vwMDSToBRE
AS
SELECT * FROM parquet.`/mnt/abidatadetail/Intake/MDS/MWDetail/MDSToBRE`;

CREATE OR REPLACE TEMP VIEW MetricName_Tbl
AS
WITH CTE AS 
   (SELECT MetricGroup,MetricName,MIN(Id) Id 
     FROM vwMDSToBRE GROUP BY  MetricGroup,MetricName)
SELECT MetricGroup,
       MetricName,
       Row_Number() over(Partition BY MetricGroup Order by Id) Rn  
  FROM CTE;

  # COMMAND ----------
  %scala
def MDSBre(MetricGroup:String) : String = 
{ 
var x = 1;
var mdsbre_sql = "CASE ";
var WhereClause = "";
var MetricName_Cnt = spark.sql("SELECT COUNT(1) FROM MetricName_Tbl WHERE MetricGroup = '"+MetricGroup+"'").first.get(0).toString(); 
var DefaultValue = spark.sql("SELECT DISTINCT DefaultValue FROM vwMDSToBRE WHERE MetricGroup = '"+MetricGroup+"'").first.get(0).toString(); 
var y = MetricName_Cnt.toInt
while(x <= y)
{          
          var WhereClause1="WHEN ";
          var MetricName = spark.sql("SELECT MetricName FROM MetricName_Tbl WHERE RN="+x+" AND MetricGroup = '"+MetricGroup+"'").first.get(0).toString();
          var M365_MinId = spark.sql("SELECT MIN(Id) MAXID FROM vwMDSToBRE WHERE MetricName = '" + MetricName+"' AND MetricGroup = '"+MetricGroup+"'").first.get(0).toString(); 
          var M365_MaxId = spark.sql("SELECT MAX(Id) MINID FROM vwMDSToBRE WHERE MetricName = '" + MetricName+"' AND MetricGroup = '"+MetricGroup+"'").first.get(0).toString(); 
          var Result = spark.sql("SELECT Result FROM vwMDSToBRE WHERE ID="+M365_MaxId + " AND MetricName = '" + MetricName+"' AND MetricGroup = '"+MetricGroup+"'").first.get(0).toString(); 
          var j = M365_MaxId.toInt
          var i = M365_MinId.toInt

          while(i <= j)
            {
                 var WhereClause2 = spark.sql("""SELECT CONCAT(FieldName , " ", Operator , " " , FilterCondition , " "  , COALESCE(LINK,"") )  FROM vwMDSToBRE WHERE ID = """+i+ " AND MetricName = '" + MetricName+"' AND MetricGroup = '"+MetricGroup+"'").first.get(0).toString()
                 WhereClause1 = WhereClause1 + " " +WhereClause2 
             i = i+1

            }
         WhereClause = WhereClause + " " +WhereClause1 + " THEN '" +Result + "'"
  x = x+1

}
mdsbre_sql = mdsbre_sql + WhereClause +  " ELSE '" + DefaultValue + "' END"
return mdsbre_sql
}

In [1]:
import PyPDF2
import pandas as pd
#create file object variable
#opening method will be rb
pdffileobj=open(r'C:\Users\Python_learning\\lm-tcfd-report-2021-1.pdf','rb')
 
#create reader variable that will read the pdffileobj
pdfreader=PyPDF2.PdfFileReader(pdffileobj)
 
#This will store the number of pages of this pdf file
x=pdfreader.numPages
 
#create a variable that will select the selected number of pages
pageobj=pdfreader.getPage(x-1)
 
#(x+1) because python indentation starts with 0.
#create text variable which will store all text datafrom pdf file
text=pageobj.extractText()
 
#save the extracted data from pdf to a txt file
#we will use file handling here
#dont forget to put r before you put the file path
#go to the file location copy the path by right clicking on the file
#click properties and copy the location path and paste it here.
#put "\\your_txtfilename"
file1=open(r"C:\Users\Python_learning\\lm-tcfd-report-2021-1.txt")
file1.writelines(text)

Xref table not zero-indexed. ID numbers for objects will be corrected.


FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\Python_learning\\\\lm-tcfd-report-2021-1.txt'

In [28]:
import PyPDF2
from PyPDF2 import PdfMerger, PdfReader, PdfWriter
import pandas as pd
import pikepdf
pdf = pikepdf.Pdf.open(r'C:/Users/v-shivampa/omy.pdf')
#create file object variable
#opening method will be rb
#pdffileobj1=open(r'C:/Users/v-shivampa/omy.pdf','rb')

In [38]:
#pdfreader=PyPDF2.PdfFileReader(pdffileobj1)
print(PdfReader)

<class 'PyPDF2._reader.PdfReader'>


In [39]:
x=pdfreader.numPages
#text=pdfreader.extractText()
#text
x

31

In [46]:
pageobj=pdfreader.getPage(x-1)


In [47]:
text=pageobj.extractText()
text

'2021 TASK FORCE ON CLIMATE-RELATED FINANCIAL DISCLOSURES REPORT31\n Contact us \nFor questions or comments regarding this report,  please contact Sustainability@LibertyMutual.com.\nCautionary statement regarding this reportThis report has been prepared solely for informational purposes from sources understood by the Company to be reliable at the time included in this report. Liberty Mutual Group (the Company) does not guarantee the accuracy, completeness, timeliness, or availability of the contents of this report. The Company is not responsible for any errors or omissions, regardless of the cause, for the results obtained from the use of the contents of this report. In no event shall the Company be liable to any party for any direct, indirect, incidental, exemplary, compensatory, punitive, special, or consequential damages, costs, expenses, legal fees, or losses (including, without limitation, lost income or lost profits and opportunity costs or losses caused by negligence) in connect