In [0]:
# reference to Databricks file system, ls - list of files and subfolders in given folder
dbutils.fs.ls('dbfs:/FileStore/tables/stockwatch')

[FileInfo(path='dbfs:/FileStore/tables/stockwatch/stockwatch_20231230-1.xlsx', name='stockwatch_20231230-1.xlsx', size=69046, modificationTime=1707999144000),
 FileInfo(path='dbfs:/FileStore/tables/stockwatch/stockwatch_20231230.xlsx', name='stockwatch_20231230.xlsx', size=107625, modificationTime=1708001127000)]

In [0]:
# move a file inside the Databricks file system, first you provide file path, then target folder as second argument 
dbutils.fs.mv('dbfs:/FileStore/tables/stockwatch_20231230-1.xlsx', 'dbfs:/FileStore/tables/stockwatch')

True

In [0]:
%pip install openpyxl 
import openpyxl # Q1 - how to make it work, even when library is not install and run quickly that check
import pandas as pd

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# Definition of a list of strings to be removed (in loop) from values of columns (which will be provided to "x" parameter)
forbidden_strings = ['\xa0']

def price_cleansing(x, forbidden_strings=forbidden_strings):
    x = x.replace(',','.')
    for fbdn_str in forbidden_strings:
        x = x.replace(fbdn_str, '')
    x = float(x.split()[0])
    return x

In [0]:
## Get the latest file from "stockwatch"
# Get the list of all files in the directory
list_of_files = dbutils.fs.ls("dbfs:/FileStore/tables/stockwatch/")

# Sort the list of files based on date in descending order
sorted_files = sorted(list_of_files, key=lambda x: x.modificationTime, reverse=True)

# Get the latest file
latest_file = sorted_files[0].path

dbfs:/FileStore/tables/stockwatch/stockwatch_20231230.xlsx


In [0]:
# # reference path the input file that was recently uploaded
# approach #1 - hardcoded path
# file_path = "/dbfs/FileStore/tables/stockwatch/stockwatch_20231230-1.xlsx"

# approach #2 - use results from previous cell results
file_path = latest_file
file_date = file_path.split("/")[-1].split("_")[-1][:8]
df = pd.read_excel(file_path, header=0, sheet_name="stocks")

# displays top 5 records from a dataframe
# df.head()

# # Column renaming - approach 1 - change column names
# df.rename(columns = {'tablescraper-selected-row':'Spółka',
#                      'tablescraper-selected-row href':'Rekomendacje wydane dla Spółki (URL)',
#                      'r':'Cena docelowa',
#                      'r 2':'Obecny kurs',
#                      'c':'Data wydania rekomendacji',
#                      'r 3':'Kurs w dniu wydania',
#                      'c 2':'Instytucja',
#                      'c href':'Rekomendacje Domu Maklerskiego (URL)',
#                      'btn href':'Raportu (URL)',
#                      'c 3':'Rekomendacja'
#                      }, inplace = True)

# Column renaming - approach 2 - read mapping from source file - horizontal
mapping = pd.read_excel(file_path, header=0, sheet_name="mapping", nrows=1)
# dictionary comprehension - assigning values to the keys from the header
col_name_dict = {i: mapping[i].tolist()[0] for i in mapping}
df.rename(columns=col_name_dict, inplace=True)

# approach 2.1 - if mapping would be vertical - iterating over row indices would be needed in order to map the key:value pairs of the column names


# # remove empty rows with "NaN" values from Columns
df.dropna(inplace=True)

# add source file date as a column (observation date)
df["Data obserwacji"] = pd.to_datetime(file_date)

# change data type of a column to a "DateTime"
df["Data wydania rekomendacji"] = pd.to_datetime(df["Data wydania rekomendacji"])


# add 'Waluta' and rearrange the order of columns
df["Waluta"] = df["Cena docelowa"].map(lambda x: x.split()[1].lower())
cols = df.columns.tolist()
cols[4], cols[5:] = cols[-1], cols[4:-1]
df = df[cols]

# drop columns with URL's in its name
url_cols = [col for col in cols if "URL" in col]
df.drop(columns=url_cols, inplace=True)

# filter out currencies other than "zł"
df = df.loc[df["Waluta"] == "zł"]
df.drop(columns=["Waluta"], inplace=True)

# Conversion of prices to floats
# df['Cena docelowa'] = df['Cena docelowa'].map(lambda x: float(x.replace(',','.').replace('\xa0','').split()[0]))
for col in ["Cena docelowa", "Obecny kurs", "Kurs w dniu wydania"]:
    df[col] = df[col].map(price_cleansing)

# # print top5 results
# df.head(5)

# # print schema
df.info(verbose=True)
df.display()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 558 entries, 12 to 679
Data columns (total 9 columns):
 #   Column                     Non-Null Count  Dtype         
---  ------                     --------------  -----         
 0   id                         558 non-null    int64         
 1   Spółka                     558 non-null    object        
 2   Cena docelowa              558 non-null    float64       
 3   Obecny kurs                558 non-null    float64       
 4   Data wydania rekomendacji  558 non-null    datetime64[ns]
 5   Kurs w dniu wydania        558 non-null    float64       
 6   Instytucja                 558 non-null    object        
 7   Rekomendacja               558 non-null    object        
 8   Data obserwacji            558 non-null    datetime64[ns]
dtypes: datetime64[ns](2), float64(3), int64(1), object(3)
memory usage: 43.6+ KB


id,Spółka,Cena docelowa,Obecny kurs,Data wydania rekomendacji,Kurs w dniu wydania,Instytucja,Rekomendacja,Data obserwacji
13,CCC,69.5,61.18,2023-12-11T00:00:00Z,58.34,DM BOŚ SA,kupuj,2023-12-30T00:00:00Z
24,ERBUD,43.2,42.8,2023-12-11T00:00:00Z,35.2,DM BOŚ SA,kupuj,2023-12-30T00:00:00Z
25,BUDIMEX,490.0,621.0,2023-12-11T00:00:00Z,580.0,DM BOŚ SA,sprzedaj,2023-12-30T00:00:00Z
30,PGE,11.0,8.65,2023-12-11T00:00:00Z,8.94,DM BOŚ SA,neutralnie,2023-12-30T00:00:00Z
31,PKNORLEN,99.0,65.68,2023-12-11T00:00:00Z,63.37,DM BOŚ SA,neutralnie,2023-12-30T00:00:00Z
33,VOTUM,81.3,45.8,2023-12-10T00:00:00Z,43.7,DM BOŚ SA,kupuj,2023-12-30T00:00:00Z
34,VOXEL,85.3,79.0,2023-12-10T00:00:00Z,74.2,DM BOŚ SA,kupuj,2023-12-30T00:00:00Z
36,FERRO,34.7,31.0,2023-12-10T00:00:00Z,31.6,DM BOŚ SA,kupuj,2023-12-30T00:00:00Z
37,DATAWALK,53.0,37.0,2023-12-08T00:00:00Z,32.5,DM BOŚ SA,trzymaj,2023-12-30T00:00:00Z
38,CLOUD,71.0,66.8,2023-12-08T00:00:00Z,68.0,DM BOŚ SA,trzymaj,2023-12-30T00:00:00Z


In [0]:
'''
nadanie indeksu "-3" aby pozbyć się przyrostka " zł" na końcu ciągu znaków
zmiana separatora dziesiętnego na kropkę
zmiana typu danych na zmiennoprzecinkowy
"map(lambda x: )" - wdrożenie funkcji z jej bezpośrednim użyciem
'''
df['Cena docelowa'].map(lambda x: float(x.replace(',','.')[:-3]))

12      69.50
23      43.20
24     490.00
29      11.00
30      99.00
        ...  
675     75.00
676      8.40
677    105.00
678    399.35
679    155.55
Name: Cena docelowa, Length: 567, dtype: float64

In [0]:
# %pip install openpyxl
# import pandas as pd
# import openpyxl
# from pyspark.sql import SparkSession

# # Create a Spark Session
# spark = SparkSession.builder.getOrCreate()

# # Reference the path to the input file that was recently uploaded
# file_path = latest_file
# df = pd.read_excel(file_path)

# # Convert pandas DataFrame to Spark DataFrame
# spark_df = spark.createDataFrame(df)

# # Change column names
# spark_df = spark_df.withColumnRenamed('tablescraper-selected-row', 'Spółka') \
#     .withColumnRenamed('tablescraper-selected-row href', 'Rekomendacje wydane dla Spółki (URL)') \
#     .withColumnRenamed('r', 'Cena docelowa') \
#     .withColumnRenamed('r 2', 'Obecny kurs') \
#     .withColumnRenamed('c', 'Data wydania rekomendacji') \
#     .withColumnRenamed('r 3', 'Kurs w dniu wydania') \
#     .withColumnRenamed('c 2', 'Instytucja') \
#     .withColumnRenamed('c href', 'Rekomendacje Domu Maklerskiego (URL)') \
#     .withColumnRenamed('btn href', 'Raportu (URL)') \
#     .withColumnRenamed('c 3', 'Rekomendacja')

# # Filter rows where 'Rekomendacja' is not null
# filtered_df = spark_df.filter(spark_df['Rekomendacja'].isNotNull())

# # Replace " zl" values to empty ones
# filtered_df['Kurs w dniu wydania'].map({' zl': ''}).fillna(filtered_df['Kurs w dniu wydania'])

# # Print top 5 rows
# filtered_df.show(5)

# # Print the schema
# #filtered_df.printSchema()

com.databricks.backend.common.rpc.SparkStoppedException: Spark down: 
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:653)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:690)
	at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$runInnerLoop$1(DriverWrapper.scala:520)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418)
	at com.databricks.backend.daemon.driver.DriverWrapper.withAttributionContext(DriverWrapper.scala:66)
	at com.databricks.ba